Skip to content
Snippets Groups Projects
Select Git revision
  • a6380ea0664f6160398ae447cc586c130432cfac
  • develop default protected
  • camp23-prod
3 results

schedules.py

Blame
  • Forked from hub / hub
    869 commits behind the upstream repository.
    schedules.py 33.81 KiB
    import logging
    from datetime import timedelta
    from hashlib import sha1
    from typing import TYPE_CHECKING, Dict, List, Optional
    from uuid import UUID, uuid4
    
    from django.core.exceptions import ObjectDoesNotExist
    from django.db import models
    from django.utils import timezone
    from django.utils.translation import gettext_lazy as _
    
    from ..fields import ConferenceReference
    from ..schedules import ScheduleTypeManager
    from ..utils import mail2uuid, mask_url, str2bool
    from .assemblies import Assembly
    from .events import Event, EventAttachment
    from .rooms import Room
    from .users import PlatformUser
    
    if TYPE_CHECKING:
        pass
    
    logger = logging.getLogger(__name__)
    
    
    class LocalObjectAccessViolation(Exception):
        """
        Indicates that the referenced/requested local object
        (in i.e. a ScheduleSourceMapping)
        is not accessible to the ScheduleSource's assembly.
        """
    
    
    class ScheduleSource(models.Model):
        id = models.UUIDField(primary_key=True, editable=False, default=uuid4)
    
        conference = ConferenceReference(related_name='schedule_sources')
        assembly = models.ForeignKey(Assembly, blank=True, null=True, related_name='schedule_sources', on_delete=models.CASCADE)
    
        import_type = models.CharField(
            blank=False,
            max_length=20,
            help_text=_('ScheduleSource__import_type__help'),
            verbose_name=_('ScheduleSource__import_type'),
        )
        import_url = models.URLField(
            blank=False,
            help_text=_('ScheduleSource__import_url__help'),
            verbose_name=_('ScheduleSource__import_url'),
        )
        import_configuration = models.JSONField(
            blank=True,
            null=True,
            help_text=_('ScheduleSource__import_configuration__help'),
            verbose_name=_('ScheduleSource__import_configuration'),
        )
    
        import_frequency = models.DurationField(
            default=timedelta(hours=1),
            help_text=_('ScheduleSource__import_frequency__help'),
            verbose_name=_('ScheduleSource__import_frequency'),
        )
        import_timeout = models.DurationField(
            default=timedelta(minutes=5),
            help_text=_('ScheduleSource__import_timeout__help'),
            verbose_name=_('ScheduleSource__import_timeout'),
        )
        last_import = models.DateTimeField(
            blank=True,
            null=True,
            help_text=_('ScheduleSource__last_import__help'),
            verbose_name=_('ScheduleSource__last_import'),
        )
    
        def __str__(self):
            return f'{self.import_type}{{{self.import_url_masked}}}'
    
        @property
        def import_url_masked(self):
            return mask_url(self.import_url) if self.import_url is not None else None
    
        @property
        def import_configuration_masked(self):
            if self.import_configuration and 'auth' in self.import_configuration:
                return {**self.import_configuration, 'auth': '********'}
            return self.import_configuration
    
        @property
        def is_due(self):
            """
            Query if an import is due based on the last successful import and configured frequency.
            """
    
            # a frequency of 0 means: don't import automatically
            if self.import_frequency.total_seconds == 0:
                return False
    
            # no imports yet? we're due!
            if self.last_import is None:
                return True
    
            # calculate elapsed time and compare it against configured frequency
            timespan_since_last_import = timezone.now() - self.last_import
            return timespan_since_last_import >= self.import_frequency
    
        @property
        def latest_import(self) -> Optional['ScheduleSourceImport']:
            result = self.imports.order_by('-start').first()
            return result
    
        @property
        def has_running_import(self):
            latest_import = self.latest_import
            if latest_import is None:
                # no import at all, therefore no running one
                return False
    
            # if latest_import has completed, there is no running one (by definition)
            if not latest_import.is_inprogress:
                return False
    
            # check that timeout has not passed yet
            return latest_import.is_beyond_deadline
    
        def _get_or_create_speaker(
            self,
            name: str,
            mail_guid: Optional[str | UUID] = None,
            addresses: Optional[List[str]] = None,
        ):
            if not name:
                raise ValueError('You need to provide a name for the speaker.')
    
            if mail_guid and not isinstance(mail_guid, UUID):
                mail_guid = UUID(mail_guid)
    
            if mail_guid:
                speaker_username = '_speaker_' + str(mail_guid)
            elif addresses:
                speaker_username = '_speaker_' + sha1('\n'.join(sorted(addresses)).encode('utf-8')).hexdigest()
            else:
                speaker_username = '_speaker_' + sha1(name.encode('utf-8')).hexdigest()
    
            # try to find by the username
            if candidate := PlatformUser.objects.filter(username=speaker_username, user_type=PlatformUser.Type.SPEAKER).first():
                return candidate, False
    
            # we try to find a PlatformUser who has a verified email matching the given mail_guid or any of the supplied addresses
            candidates = []  # type: List[PlatformUser]
            for cm in self.conference.users.select_related('user').filter(user__user_type__in=PlatformUser.PERSON_TYPES).iterator():  # type: ConferenceMember
                for addr in cm.user.get_verified_mail_addresses():
                    if mail_guid and mail_guid == mail2uuid(addr):
                        candidates.append(cm.user)
                        break  # exit the inner for-loop
                    if addresses and addr in addresses:
                        candidates.append(cm.user)
                        break  # exit the inner for-loop
    
            # the good case: we found something \o/
            if len(candidates) == 1:
                return candidates[0], False
    
            # the very bad case: we found too many
            if len(candidates) > 1:
                raise ValueError('Multiple candidate speakers found: ' + '; '.join(str(x.pk) for x in candidates))
    
            # hail mary attempt: see if we have an imported speaker with the same name
            candidates = self.conference.users.select_related('user').filter(user__user_type=PlatformUser.Type.SPEAKER, user__display_name=name).all()
            if len(candidates) == 1:
                return candidates[0], False
    
            # the expected case: nothing found, create a new one
            name_split = name.split(' ')
            user_kwargs = {
                'user_type': PlatformUser.Type.SPEAKER,
                'username': speaker_username,
                'show_name': True,
                'last_name': name_split.pop(),
                'first_name': ' '.join(name_split),
            }
    
            # set an existing UUID, if possible
            if mail_guid:
                user_kwargs['uuid'] = str(mail_guid)
    
            # create the PlatformUser
            new_user = PlatformUser(**user_kwargs)
            new_user.save()
    
            return new_user, True
    
        def get_or_create_mapping(self, mapping_type, source_id, create_local_object=True, source_uuid=None, hints: dict | None = None):
            """
            Fetches the local object mapped by the given type and id.
            If the mapping does not exist yet, a new one is created.
    
            For a new mapping (and unless create_local_object is set to False),
            the local object is also created if mapping_type is a room or event.
            For event attachments or speakers the associated event would be needed,
            therefore no automatic creation is possible.
    
            Returns a tuple: mapping, created flag
            """
            try:
                mapping = self.mappings.get(mapping_type=mapping_type, source_id=source_id)
                return mapping, False
    
            except ObjectDoesNotExist:
                lo = None
                if create_local_object:
                    if mapping_type == ScheduleSourceMapping.MappingType.ROOM:
                        try:
                            if source_uuid:
                                lo = Room.objects.get(conference=self.conference, pk=source_uuid)
                            elif self.assembly:
                                lo = Room.objects.get(conference=self.conference, assembly=self.assembly, name__iexact=source_id)
                            else:
                                lo = Room.objects.get(conference=self.conference, name__iexact=source_id)
                        except Room.MultipleObjectsReturned:
                            raise ValueError('Room name is not unique, please provide room guid')
                        except Room.DoesNotExist:
                            if self.assembly:
                                lo = Room(conference=self.conference, pk=source_uuid, assembly=self.assembly)
                            else:
                                raise ValueError('Cannot create room for wildcard schedule source.')
    
                    elif mapping_type == ScheduleSourceMapping.MappingType.EVENT:
                        assembly = self.assembly
                        if assembly is None:
                            # wildcard schedulesource: lookup assembly based on referenced room
                            if hints and 'room_lookup' in hints and 'room_name' in hints:
                                r = hints['room_lookup'](hints['room_name'])
                                assembly = r.assembly
    
                            if assembly is None:
                                raise NotImplementedError('Event on wildcard schedulesource could not be matched to an assembly.')
    
                        lo = Event(conference=self.conference, pk=source_uuid, assembly=assembly)
    
                    elif mapping_type == ScheduleSourceMapping.MappingType.SPEAKER:
                        lo, _ = self._get_or_create_speaker(
                            mail_guid=source_uuid,
                            name=hints.get('name'),
                            addresses=hints.get('addresses'),
                        )
    
                mapping = self.mappings.create(
                    mapping_type=mapping_type,
                    source_id=source_id,
                    local_id=lo.pk,
                    local_object=lo,
                )
                return mapping, True
    
        def _load_dataitem(self, activity: list, item: dict, item_source_id: str, item_type: str, expected_items: list, items: dict, from_dict_args: dict) -> str:
            if item_source_id is None:
                raise ValueError(f'Each {item_type} must have a unique "id" field.')
            item_delete = str2bool(item.pop('_delete', 'n')) is True
    
            item_source_uuid = None
            try:
                if not isinstance(item_source_id, int):
                    item_source_uuid = UUID(item_source_id)
            except ValueError:
                pass
            try:
                candidate = item.get('uuid') or item.get('guid')
                if item_source_uuid is None and candidate and not isinstance(candidate, int):
                    item_source_uuid = UUID(candidate)
            except ValueError:
                pass
    
            hints = {}
            if item_type == 'event':
                hints['room_lookup'] = from_dict_args.get('room_lookup')
                hints['room_name'] = item.get('room')
                hints['speaker_lookup'] = from_dict_args.get('speaker_lookup')
            elif item_type == 'speaker':
                hints['name'] = item.get('full_public_name') or item.get('public_name') or item.get('name')
                hints['addresses'] = item.get('addresses')
    
            mapping, new_mapping = self.get_or_create_mapping(
                mapping_type=item_type,
                source_id=str(item_source_id),
                create_local_object=not item_delete,
                source_uuid=item_source_uuid,
                hints=hints,
            )
    
            # an item we don't know about shall be deleted? skip it
            if mapping.skip or (new_mapping and item_delete):
                # new mapping got created already? delete it!
                if new_mapping and item.pk:
                    item.delete()
    
                # log us skipping the entry
                activity.append(
                    {
                        'action': 'skipped',
                        'type': item_type,
                        'source_id': item_source_id,
                        'local_id': None,
                    }
                )
    
                # if we have the item locally but shall skip it, handle it as 'handled' anyway
                # so that it is not picked up again as e.g. 'missing_events'
                if not new_mapping and mapping.local_id and mapping.local_id in expected_items:
                    expected_items.remove(mapping.local_id)
    
                # signal that we didn't actually load an item
                return 'skipped'
    
            if new_mapping:
                try:
                    # load item's data
                    if self.assembly:
                        # assign the room's/event's assembly from this source
                        from_dict_args['assembly'] = self.assembly
                    elif 'assembly' in from_dict_args:
                        # better be safe: disallow overwriting assembly in the wildcard case
                        del from_dict_args['assembly']
                    mapping.local_object.from_dict(
                        conference=self.conference,
                        existing=mapping.local_object,
                        data=item,
                        **from_dict_args,
                    )
    
                    # save the item with the freshly loaded data
                    mapping.local_object.save()
    
                    # update the mapping's local_id
                    mapping.local_id = mapping.local_object.pk
                    mapping.save()
    
                except Exception as err:
                    # log the error
                    items[item_source_id] = None
                    activity.append(
                        {
                            'action': 'error',
                            'type': item_type,
                            'source_id': item_source_id,
                            'local_id': None,
                            'message': str(err),
                        }
                    )
                    logging.exception('Import on ScheduleSource %s encountered exception on creating mapping for %s "%s".', self.pk, item_type, item_source_id)
    
                    # ... and delete the incomplete (wrong) mapping
                    mapping.delete()
                    return 'error'
    
                else:
                    # store new item's data
                    items[item_source_id] = mapping.local_object
                    activity.append(
                        {
                            'action': 'added',
                            'type': item_type,
                            'source_id': item_source_id,
                            'local_id': str(mapping.local_id),
                        }
                    )
                return 'added'
    
            # note that we've seen the existing room in the imported data
            if mapping.local_id in expected_items:
                expected_items.remove(mapping.local_id)
            else:
                logger.warning('[job %s] Local %s id (%s) is not in the list of expected items.', self.id, item_type, mapping.local_id)
    
            # existing room shall be deleted
            if item_delete:
                mapping.local_object.delete()
                items[item_source_id] = None
                activity.append(
                    {
                        'action': 'deleted',
                        'type': item_type,
                        'source_id': item_source_id,
                        'local_id': str(mapping.local_id),
                    }
                )
                return 'deleted'
    
            # update data on existing room
            if self.assembly:
                # assign the room's/event's assembly from this source
                from_dict_args['assembly'] = self.assembly
            elif 'assembly' in from_dict_args:
                # better be safe: disallow overwriting assembly in the wildcard case
                del from_dict_args['assembly']
            mapping.local_object.from_dict(
                conference=self.conference,
                existing=mapping.local_object,
                data=item,
                **from_dict_args,
            )
            mapping.local_object.save()
    
            items[item_source_id] = mapping.local_object
            activity.append(
                {
                    'action': 'seen',  # TODO: set to 'changed' if data was changed (returned by .from_dict()?)
                    'type': item_type,
                    'source_id': item_source_id,
                    'local_id': str(mapping.local_id),
                }
            )
            return 'seen'
    
        def load_data(self, data):
            assert isinstance(data, dict)
            assert 'rooms' in data and isinstance(data['rooms'], dict), 'There are no rooms in the data.'
            assert 'events' in data and isinstance(data['events'], dict), 'There are no events in the data.'
    
            # initialize activity log and list of events/rooms
            activity = []
            events = {}
            rooms = {}
            speakers = {}
    
            # derive some flags
            cfg = self.import_configuration or {}
            missing_events = cfg.get('missing_events') or 'ignore'
            replace_conference_slug_prefix = cfg.get('replace_conference_slug_prefix')
            allow_track = cfg.get('import_tracks') or False
    
            # note down all existing rooms, events and speakers so that we can call out the missing ones
            if self.assembly:
                expected_rooms = list(self.assembly.rooms.values_list('id', flat=True))
            else:
                expected_rooms = list(
                    self.mappings.filter(
                        mapping_type=ScheduleSourceMapping.MappingType.ROOM,
                    ).values_list('local_id', flat=True)
                )
            expected_events = list(
                self.mappings.filter(
                    mapping_type=ScheduleSourceMapping.MappingType.EVENT,
                ).values_list('local_id', flat=True)
            )
            expected_speakers = list(
                self.mappings.filter(
                    mapping_type=ScheduleSourceMapping.MappingType.SPEAKER,
                ).values_list('local_id', flat=True)
            )
    
            def speaker_lookup(speaker_info: Dict[str, str]):
                """
                Try to match the given speaker dict to a PlatformUser, if necessary creating a virtual one in the process.
                Returns None if the speaker shall be skipped (explicitly, using ScheduleSourceMapping.skip=True).
    
                Example:
                ```json
                {
                  "id": 4711,
                  "guid": "c25334d0-9539-55e3-92b4-f559c384522b",
                  "name": "Hub Team",
                  "links": [
                    {
                      "url": "https://git.cccv.de/hub/hub",
                      "title": "Quellcode"
                    }
                  ],
                  "biography": "Das Projekt-Team vom Hub, der Daten-Integrationsplattform von Congress & Camp.",
                  "avatar": "https://www.ccc.de/images/events.png",
                  "public_name": "Hub Team"
                }
                ```
                """
    
                # for the source id use the provided id, uuid or guid field (in order)
                speaker_id = speaker_info.get('id') or speaker_info.get('uuid') or speaker_info.get('guid')
    
                # sanity check: verify that required attributes are present
                if not speaker_info:
                    raise ValueError('Missing required attribute in speaker_info: id/uuid/guid')
    
                try:
                    action = self._load_dataitem(
                        activity=activity,
                        item=speaker_info,
                        item_source_id=speaker_id,
                        item_type='speaker',
                        expected_items=expected_speakers,
                        items=speakers,
                        from_dict_args={},
                    )
    
                    if action == 'skipped':
                        # if the speaker has been skipped throw it into the mapping table anyway
                        spk_mapping = self.mappings.get(mapping_type=ScheduleSourceMapping.MappingType.SPEAKER, source_id=speaker_id)
                        assert spk_mapping.skip
                        speakers[speaker_id] = spk_mapping.local_object
    
                except Exception as err:
                    activity.append(
                        {
                            'action': 'error',
                            'type': 'speaker',
                            'source_id': speaker_id,
                            'local_id': None,
                            'message': str(err),
                        }
                    )
                    logging.exception('Import on ScheduleSource %s encountered exception on loading speaker "%s".', self.pk, speaker_id)
    
                return speakers[speaker_id]
    
            # first, load the rooms (as they're needed for events)
            for r_id, r in data['rooms'].items():
                try:
                    if replace_conference_slug_prefix and r.get('slug', '').startswith(replace_conference_slug_prefix):
                        r['slug'] = self.conference.slug + r['slug'][len(replace_conference_slug_prefix) :]
    
                    action = self._load_dataitem(
                        activity=activity,
                        item=r,
                        item_source_id=r_id,
                        item_type='room',
                        expected_items=expected_rooms,
                        items=rooms,
                        from_dict_args={
                            'allow_backend_roomtypes': False,
                        },
                    )
    
                    if action == 'skipped':
                        # if the room has been skipped throw it into the mapping table anyway
                        r_mapping = self.mappings.get(mapping_type=ScheduleSourceMapping.MappingType.ROOM, source_id=r_id)
                        assert r_mapping.skip
                        rooms[r_id] = r_mapping.local_object
    
                except Exception as err:
                    activity.append(
                        {
                            'action': 'error',
                            'type': 'room',
                            'source_id': r_id,
                            'local_id': None,
                            'message': str(err),
                        }
                    )
                    logging.exception('Import on ScheduleSource %s encountered exception on loading room "%s".', self.pk, r_id)
    
            # then load events
            for e_id, e in data['events'].items():
                try:
                    if replace_conference_slug_prefix and e.get('slug', '').startswith(replace_conference_slug_prefix):
                        e['slug'] = self.conference.slug + e['slug'][len(replace_conference_slug_prefix) :]
    
                    self._load_dataitem(
                        activity=activity,
                        item=e,
                        item_source_id=e_id,
                        item_type='event',
                        expected_items=expected_events,
                        items=events,
                        from_dict_args={
                            'allow_kind': self.assembly.is_official if self.assembly else False,  # TODO: lookup assembly's room if not given
                            'allow_track': allow_track,  # TODO
                            'room_lookup': lambda r_source_id: rooms.get(r_source_id),
                            'speaker_lookup': speaker_lookup,
                        },
                    )
                except Exception as err:
                    activity.append(
                        {
                            'action': 'error',
                            'type': 'event',
                            'source_id': e_id,
                            'local_id': e.get('uuid', None),
                            'message': str(err),
                        }
                    )
                    logging.exception('Import on ScheduleSource %s encountered exception on loading event "%s".', self.pk, e_id)
    
            # flag the non-loaded rooms as 'missing'
            for room_id in expected_rooms:
                activity.append(
                    {
                        'action': 'missing',
                        'type': 'room',
                        'source_id': None,
                        'local_id': str(room_id),
                    }
                )
    
            # flag the non-loaded events as 'missing'
            for event_id in expected_events:
                act = {
                    'action': 'missing',
                    'type': 'event',
                    'source_id': None,
                    'local_id': str(event_id),
                }
    
                # check if we should do something about the missing event
                match missing_events:
                    case 'delete':
                        Event.objects.filter(pk=event_id).delete()
                        act['action'] = 'deleted'
                    case 'depublish':
                        Event.objects.filter(pk=event_id).update(is_public=False, room=None)
                    # Depublish is default, but python does not allow ['depublish', _]
                    case _:
                        Event.objects.filter(pk=event_id).update(is_public=False, room=None)
    
                activity.append(act)
    
            return activity
    
    
    class ScheduleSourceMapping(models.Model):
        class MappingType(models.TextChoices):
            UNKNOWN = 'unknown', _('unknown')
            ROOM = 'room', _('Room')
            EVENT = 'event', _('Event')
            EVENT_ATTACHMENT = 'event_attachment', _('EventAttachment')
            SPEAKER = 'speaker', _('EventParticipant__type-speaker')
    
        schedule_source = models.ForeignKey(ScheduleSource, related_name='mappings', on_delete=models.CASCADE)
        mapping_type = models.CharField(max_length=20, choices=MappingType.choices)
    
        source_id = models.CharField(max_length=200)
        """
        An arbitrary identifier in the source system.
        Not evaluated by the hub itself - it is specific to the ScheduleSupport implementation in use.
        """
    
        local_id = models.UUIDField(blank=True, null=True)
        """
        The local UUID of the mapped object.
        See also: the local_object property
        """
    
        skip = models.BooleanField(default=False)
        """
        Indicates that the remote item shall be ignored.
        """
    
        extra_data = models.JSONField(blank=True, null=True)
        """
        Import-specific additional mapping information.
        Internal use only, not to be presented to the user.
        """
    
        def __init__(self, *args, **kwargs):
            self._local_object = kwargs.pop('local_object', None)
            super().__init__(*args, **kwargs)
    
        def get_local_object(self):
            """
            Fetches the local object based on the given mapping_type and local_id.
            May throw a LocalObjectAccessViolation if the associated assembly does not match.
    
            Consider using the local_object property which caches its result and works
            for newly created mappings by ScheduleSource.get_or_create_mapping(), too.
            """
    
            if self.mapping_type == self.MappingType.ROOM:
                room = Room.objects.get(pk=self.local_id)
                if room.assembly_id is not None:
                    if self.schedule_source.assembly_id is not None and room.assembly_id != self.schedule_source.assembly_id:
                        raise LocalObjectAccessViolation('Assembly of Room does not match.')
                elif self.schedule_source.assembly_id is not None and room.conference_id != self.schedule_source.assembly.conference_id:
                    raise LocalObjectAccessViolation('Conference of Room does not match.')
                return room
    
            if self.mapping_type == self.MappingType.EVENT:
                event = Event.objects.get(pk=self.local_id)
                if self.schedule_source.assembly_id is not None and event.assembly_id != self.schedule_source.assembly_id:
                    raise LocalObjectAccessViolation('Assembly of Event does not match.')
                return event
    
            if self.mapping_type == self.MappingType.EVENT_ATTACHMENT:
                attachment = EventAttachment.objects.prefetch_related('event').get(pk=self.local_id)
                if self.schedule_source.assembly_id is not None and attachment.event.assembly_id != self.schedule_source.assembly_id:
                    raise LocalObjectAccessViolation('Assembly of EventAttachment does not match.')
                return attachment
    
            if self.mapping_type == self.MappingType.SPEAKER:
                speaker = PlatformUser.objects.get(pk=self.local_id)
                if not speaker.is_person:
                    raise LocalObjectAccessViolation("Referenced speaker's PlatformUser is not a person.")
                return speaker
    
            # we don't know about that mapping type, bail out
            raise LocalObjectAccessViolation('Unknown mapping.')
    
        @property
        def local_object(self):
            if self._local_object is None:
                self._local_object = self.get_local_object()
            return self._local_object
    
        def __str__(self):
            if self.skip:
                return f'{{ {self.mapping_type} "{self.source_id}" skip }}'
            return f'{{ {self.mapping_type} "{self.source_id}" -> "{self.local_id}" }}'
    
    
    class ScheduleSourceImport(models.Model):
        class State(models.TextChoices):
            UNKNOWN = 'unknown', _('ScheduleSourceImport__state-unknown')
            PREPARED = 'prepared', _('ScheduleSourceImport__state-prepared')
            STARTED = 'started', _('ScheduleSourceImport__state-started')
            CONNECTION_ERROR = 'conn_failed', _('ScheduleSourceImport__state-conn_failed')
            FETCHED = 'fetched', _('ScheduleSourceImport__state-fetched')
            IMPORTING = 'importing', _('ScheduleSourceImport__state-importing')
            IMPORT_ERROR = 'import_failed', _('ScheduleSourceImport__state-import_failed')
            TIMED_OUT = 'timeout', _('ScheduleSourceImport__state-timeout')
            COMPLETED = 'completed', _('ScheduleSourceImport__state-completed')
            WARNING = 'warning', _('ScheduleSourceImport__state-warning')
            ERROR = 'error', _('ScheduleSourceImport__state-error')
    
        class Meta:
            ordering = ['-start', '-end']
    
        ERROR_STATES = [State.CONNECTION_ERROR, State.IMPORT_ERROR, State.ERROR]
        SUCCESS_STATES = [State.COMPLETED, State.WARNING]
        WORK_STATES = [State.STARTED, State.IMPORTING]
    
        schedule_source = models.ForeignKey(ScheduleSource, related_name='imports', on_delete=models.CASCADE)
        state = models.CharField(max_length=20, choices=State.choices)
        summary = models.CharField(max_length=200, blank=True, null=True)
    
        start = models.DateTimeField(auto_now_add=True)
        """Timestamp of import start"""
        end = models.DateTimeField(null=True)
        """Timestamp of import end (successful or not)"""
    
        data = models.JSONField(blank=True, null=True)
        """
        The raw import data as fetched by the ScheduleSupport implementation.
        """
    
        @property
        def is_failed(self):
            return self.state in self.ERROR_STATES
    
        @property
        def is_inprogress(self):
            return self.state in self.WORK_STATES
    
        @property
        def is_success(self):
            return self.state in self.SUCCESS_STATES
    
        @property
        def is_beyond_deadline(self):
            deadline = self.start + self.schedule_source.import_timeout
            return timezone.now() < deadline
    
        @property
        def text_color_class(self):
            if self.state == ScheduleSourceImport.State.WARNING:
                return 'text-warning'
            if self.is_success:
                return 'text-success'
            if self.is_failed:
                return 'text-danger'
            if self.is_inprogress:
                return 'text-primary'
    
            return 'text-muted'
    
        @property
        def errors(self):
            if not self.data:
                return None
            errors = [x for x in self.data.get('_activity', []) if x['action'] == 'error']
            return errors
    
        def do_import(self):
            self.refresh_from_db()
            assert self.state == self.State.PREPARED
    
            import_type = self.schedule_source.import_type
            if not ScheduleTypeManager.has_type(import_type):
                logger.error('[job %s] unknown import_type "%s"', self.pk, import_type)
                self.state = self.State.ERROR
                self.summary = f'ERROR: unknown import type "{ import_type }"'
                self.end = timezone.now()
                self.save(update_fields=['state', 'summary', 'end'])
                return False
    
            self.start = timezone.now()
            self.end = None
            self.state = self.State.STARTED
            self.save(update_fields=['start', 'state'])
            logger.info('[job %s] starting import', self.pk)
    
            try:
                configuration_overwrite = None
                data = ScheduleTypeManager.fetch(self, configuration_overwrite=configuration_overwrite)
    
            except Exception as err:
                logger.exception('[job %s] fetch failed: %s', self.pk, err)
    
                self.state = self.State.ERROR
                self.summary = f'FETCH ERROR: { err }'[:200]
                self.end = timezone.now()
                self.save(update_fields=['state', 'summary', 'end'])
                return False
    
            # update state, data has been set by .fetch() already
            self.state = self.State.IMPORTING
            self.save(update_fields=['state'])
            errors = []
    
            try:
                if len(data) == 1 and data.get('_ERROR'):
                    raise Exception(data['_ERROR'])
    
                activity = self.schedule_source.load_data(data)
                errors = [x for x in activity if x['action'] == 'error']
                msgs = []
    
                self.data['_activity'] = activity
                if errors:
                    # create list of unique errors for summary
                    msgs = list({x['message'].split('\n')[0] for x in errors})
    
                stats = (
                    ', '.join(
                        (t + '=' + str(sum(1 for x in activity if x['action'] == t)))
                        for t in ['added', 'changed', 'seen', 'deleted', 'missing', 'error', 'skipped']
                    )
                    + ' \n'
                    + ' \n'.join(msgs)
                )
                self.summary = f"{data.get('version') or ''}\nDONE: {stats}"[:200]
    
                if len(errors) > len(activity) / 2:
                    raise Exception('Too many errors, aborting import: ' + stats)
    
                self.save(update_fields=['data', 'summary'])
    
            except Exception as err:
                logger.exception('[job %s] import failed: %s', self.pk, err)
    
                self.state = self.State.IMPORT_ERROR
                self.summary = f'IMPORT ERROR: { err }'[:200]
                self.end = timezone.now()
                self.save(update_fields=['state', 'summary', 'end'])
                return False
    
            # update state, data has been set by .fetch() already
            self.state = self.State.WARNING if errors else self.State.COMPLETED
            self.end = timezone.now()
            self.save(update_fields=['state', 'end'])
            logger.info('[job %s] import complete (took %ss): %s', self.pk, self.end - self.start, self.summary)
    
            self.schedule_source.last_import = timezone.now()
            self.schedule_source.last_import_version = data.get('version')
            self.schedule_source.save(update_fields=['last_import'])
            return True