Select Git revision
schedules.py
Forked from
hub / hub
869 commits behind the upstream repository.

HeJ authored
schedules.py 33.81 KiB
import logging
from datetime import timedelta
from hashlib import sha1
from typing import TYPE_CHECKING, Dict, List, Optional
from uuid import UUID, uuid4
from django.core.exceptions import ObjectDoesNotExist
from django.db import models
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from ..fields import ConferenceReference
from ..schedules import ScheduleTypeManager
from ..utils import mail2uuid, mask_url, str2bool
from .assemblies import Assembly
from .events import Event, EventAttachment
from .rooms import Room
from .users import PlatformUser
if TYPE_CHECKING:
pass
logger = logging.getLogger(__name__)
class LocalObjectAccessViolation(Exception):
"""
Indicates that the referenced/requested local object
(in i.e. a ScheduleSourceMapping)
is not accessible to the ScheduleSource's assembly.
"""
class ScheduleSource(models.Model):
id = models.UUIDField(primary_key=True, editable=False, default=uuid4)
conference = ConferenceReference(related_name='schedule_sources')
assembly = models.ForeignKey(Assembly, blank=True, null=True, related_name='schedule_sources', on_delete=models.CASCADE)
import_type = models.CharField(
blank=False,
max_length=20,
help_text=_('ScheduleSource__import_type__help'),
verbose_name=_('ScheduleSource__import_type'),
)
import_url = models.URLField(
blank=False,
help_text=_('ScheduleSource__import_url__help'),
verbose_name=_('ScheduleSource__import_url'),
)
import_configuration = models.JSONField(
blank=True,
null=True,
help_text=_('ScheduleSource__import_configuration__help'),
verbose_name=_('ScheduleSource__import_configuration'),
)
import_frequency = models.DurationField(
default=timedelta(hours=1),
help_text=_('ScheduleSource__import_frequency__help'),
verbose_name=_('ScheduleSource__import_frequency'),
)
import_timeout = models.DurationField(
default=timedelta(minutes=5),
help_text=_('ScheduleSource__import_timeout__help'),
verbose_name=_('ScheduleSource__import_timeout'),
)
last_import = models.DateTimeField(
blank=True,
null=True,
help_text=_('ScheduleSource__last_import__help'),
verbose_name=_('ScheduleSource__last_import'),
)
def __str__(self):
return f'{self.import_type}{{{self.import_url_masked}}}'
@property
def import_url_masked(self):
return mask_url(self.import_url) if self.import_url is not None else None
@property
def import_configuration_masked(self):
if self.import_configuration and 'auth' in self.import_configuration:
return {**self.import_configuration, 'auth': '********'}
return self.import_configuration
@property
def is_due(self):
"""
Query if an import is due based on the last successful import and configured frequency.
"""
# a frequency of 0 means: don't import automatically
if self.import_frequency.total_seconds == 0:
return False
# no imports yet? we're due!
if self.last_import is None:
return True
# calculate elapsed time and compare it against configured frequency
timespan_since_last_import = timezone.now() - self.last_import
return timespan_since_last_import >= self.import_frequency
@property
def latest_import(self) -> Optional['ScheduleSourceImport']:
result = self.imports.order_by('-start').first()
return result
@property
def has_running_import(self):
latest_import = self.latest_import
if latest_import is None:
# no import at all, therefore no running one
return False
# if latest_import has completed, there is no running one (by definition)
if not latest_import.is_inprogress:
return False
# check that timeout has not passed yet
return latest_import.is_beyond_deadline
def _get_or_create_speaker(
self,
name: str,
mail_guid: Optional[str | UUID] = None,
addresses: Optional[List[str]] = None,
):
if not name:
raise ValueError('You need to provide a name for the speaker.')
if mail_guid and not isinstance(mail_guid, UUID):
mail_guid = UUID(mail_guid)
if mail_guid:
speaker_username = '_speaker_' + str(mail_guid)
elif addresses:
speaker_username = '_speaker_' + sha1('\n'.join(sorted(addresses)).encode('utf-8')).hexdigest()
else:
speaker_username = '_speaker_' + sha1(name.encode('utf-8')).hexdigest()
# try to find by the username
if candidate := PlatformUser.objects.filter(username=speaker_username, user_type=PlatformUser.Type.SPEAKER).first():
return candidate, False
# we try to find a PlatformUser who has a verified email matching the given mail_guid or any of the supplied addresses
candidates = [] # type: List[PlatformUser]
for cm in self.conference.users.select_related('user').filter(user__user_type__in=PlatformUser.PERSON_TYPES).iterator(): # type: ConferenceMember
for addr in cm.user.get_verified_mail_addresses():
if mail_guid and mail_guid == mail2uuid(addr):
candidates.append(cm.user)
break # exit the inner for-loop
if addresses and addr in addresses:
candidates.append(cm.user)
break # exit the inner for-loop
# the good case: we found something \o/
if len(candidates) == 1:
return candidates[0], False
# the very bad case: we found too many
if len(candidates) > 1:
raise ValueError('Multiple candidate speakers found: ' + '; '.join(str(x.pk) for x in candidates))
# hail mary attempt: see if we have an imported speaker with the same name
candidates = self.conference.users.select_related('user').filter(user__user_type=PlatformUser.Type.SPEAKER, user__display_name=name).all()
if len(candidates) == 1:
return candidates[0], False
# the expected case: nothing found, create a new one
name_split = name.split(' ')
user_kwargs = {
'user_type': PlatformUser.Type.SPEAKER,
'username': speaker_username,
'show_name': True,
'last_name': name_split.pop(),
'first_name': ' '.join(name_split),
}
# set an existing UUID, if possible
if mail_guid:
user_kwargs['uuid'] = str(mail_guid)
# create the PlatformUser
new_user = PlatformUser(**user_kwargs)
new_user.save()
return new_user, True
def get_or_create_mapping(self, mapping_type, source_id, create_local_object=True, source_uuid=None, hints: dict | None = None):
"""
Fetches the local object mapped by the given type and id.
If the mapping does not exist yet, a new one is created.
For a new mapping (and unless create_local_object is set to False),
the local object is also created if mapping_type is a room or event.
For event attachments or speakers the associated event would be needed,
therefore no automatic creation is possible.
Returns a tuple: mapping, created flag
"""
try:
mapping = self.mappings.get(mapping_type=mapping_type, source_id=source_id)
return mapping, False
except ObjectDoesNotExist:
lo = None
if create_local_object:
if mapping_type == ScheduleSourceMapping.MappingType.ROOM:
try:
if source_uuid:
lo = Room.objects.get(conference=self.conference, pk=source_uuid)
elif self.assembly:
lo = Room.objects.get(conference=self.conference, assembly=self.assembly, name__iexact=source_id)
else:
lo = Room.objects.get(conference=self.conference, name__iexact=source_id)
except Room.MultipleObjectsReturned:
raise ValueError('Room name is not unique, please provide room guid')
except Room.DoesNotExist:
if self.assembly:
lo = Room(conference=self.conference, pk=source_uuid, assembly=self.assembly)
else:
raise ValueError('Cannot create room for wildcard schedule source.')
elif mapping_type == ScheduleSourceMapping.MappingType.EVENT:
assembly = self.assembly
if assembly is None:
# wildcard schedulesource: lookup assembly based on referenced room
if hints and 'room_lookup' in hints and 'room_name' in hints:
r = hints['room_lookup'](hints['room_name'])
assembly = r.assembly
if assembly is None:
raise NotImplementedError('Event on wildcard schedulesource could not be matched to an assembly.')
lo = Event(conference=self.conference, pk=source_uuid, assembly=assembly)
elif mapping_type == ScheduleSourceMapping.MappingType.SPEAKER:
lo, _ = self._get_or_create_speaker(
mail_guid=source_uuid,
name=hints.get('name'),
addresses=hints.get('addresses'),
)
mapping = self.mappings.create(
mapping_type=mapping_type,
source_id=source_id,
local_id=lo.pk,
local_object=lo,
)
return mapping, True
def _load_dataitem(self, activity: list, item: dict, item_source_id: str, item_type: str, expected_items: list, items: dict, from_dict_args: dict) -> str:
if item_source_id is None:
raise ValueError(f'Each {item_type} must have a unique "id" field.')
item_delete = str2bool(item.pop('_delete', 'n')) is True
item_source_uuid = None
try:
if not isinstance(item_source_id, int):
item_source_uuid = UUID(item_source_id)
except ValueError:
pass
try:
candidate = item.get('uuid') or item.get('guid')
if item_source_uuid is None and candidate and not isinstance(candidate, int):
item_source_uuid = UUID(candidate)
except ValueError:
pass
hints = {}
if item_type == 'event':
hints['room_lookup'] = from_dict_args.get('room_lookup')
hints['room_name'] = item.get('room')
hints['speaker_lookup'] = from_dict_args.get('speaker_lookup')
elif item_type == 'speaker':
hints['name'] = item.get('full_public_name') or item.get('public_name') or item.get('name')
hints['addresses'] = item.get('addresses')
mapping, new_mapping = self.get_or_create_mapping(
mapping_type=item_type,
source_id=str(item_source_id),
create_local_object=not item_delete,
source_uuid=item_source_uuid,
hints=hints,
)
# an item we don't know about shall be deleted? skip it
if mapping.skip or (new_mapping and item_delete):
# new mapping got created already? delete it!
if new_mapping and item.pk:
item.delete()
# log us skipping the entry
activity.append(
{
'action': 'skipped',
'type': item_type,
'source_id': item_source_id,
'local_id': None,
}
)
# if we have the item locally but shall skip it, handle it as 'handled' anyway
# so that it is not picked up again as e.g. 'missing_events'
if not new_mapping and mapping.local_id and mapping.local_id in expected_items:
expected_items.remove(mapping.local_id)
# signal that we didn't actually load an item
return 'skipped'
if new_mapping:
try:
# load item's data
if self.assembly:
# assign the room's/event's assembly from this source
from_dict_args['assembly'] = self.assembly
elif 'assembly' in from_dict_args:
# better be safe: disallow overwriting assembly in the wildcard case
del from_dict_args['assembly']
mapping.local_object.from_dict(
conference=self.conference,
existing=mapping.local_object,
data=item,
**from_dict_args,
)
# save the item with the freshly loaded data
mapping.local_object.save()
# update the mapping's local_id
mapping.local_id = mapping.local_object.pk
mapping.save()
except Exception as err:
# log the error
items[item_source_id] = None
activity.append(
{
'action': 'error',
'type': item_type,
'source_id': item_source_id,
'local_id': None,
'message': str(err),
}
)
logging.exception('Import on ScheduleSource %s encountered exception on creating mapping for %s "%s".', self.pk, item_type, item_source_id)
# ... and delete the incomplete (wrong) mapping
mapping.delete()
return 'error'
else:
# store new item's data
items[item_source_id] = mapping.local_object
activity.append(
{
'action': 'added',
'type': item_type,
'source_id': item_source_id,
'local_id': str(mapping.local_id),
}
)
return 'added'
# note that we've seen the existing room in the imported data
if mapping.local_id in expected_items:
expected_items.remove(mapping.local_id)
else:
logger.warning('[job %s] Local %s id (%s) is not in the list of expected items.', self.id, item_type, mapping.local_id)
# existing room shall be deleted
if item_delete:
mapping.local_object.delete()
items[item_source_id] = None
activity.append(
{
'action': 'deleted',
'type': item_type,
'source_id': item_source_id,
'local_id': str(mapping.local_id),
}
)
return 'deleted'
# update data on existing room
if self.assembly:
# assign the room's/event's assembly from this source
from_dict_args['assembly'] = self.assembly
elif 'assembly' in from_dict_args:
# better be safe: disallow overwriting assembly in the wildcard case
del from_dict_args['assembly']
mapping.local_object.from_dict(
conference=self.conference,
existing=mapping.local_object,
data=item,
**from_dict_args,
)
mapping.local_object.save()
items[item_source_id] = mapping.local_object
activity.append(
{
'action': 'seen', # TODO: set to 'changed' if data was changed (returned by .from_dict()?)
'type': item_type,
'source_id': item_source_id,
'local_id': str(mapping.local_id),
}
)
return 'seen'
def load_data(self, data):
assert isinstance(data, dict)
assert 'rooms' in data and isinstance(data['rooms'], dict), 'There are no rooms in the data.'
assert 'events' in data and isinstance(data['events'], dict), 'There are no events in the data.'
# initialize activity log and list of events/rooms
activity = []
events = {}
rooms = {}
speakers = {}
# derive some flags
cfg = self.import_configuration or {}
missing_events = cfg.get('missing_events') or 'ignore'
replace_conference_slug_prefix = cfg.get('replace_conference_slug_prefix')
allow_track = cfg.get('import_tracks') or False
# note down all existing rooms, events and speakers so that we can call out the missing ones
if self.assembly:
expected_rooms = list(self.assembly.rooms.values_list('id', flat=True))
else:
expected_rooms = list(
self.mappings.filter(
mapping_type=ScheduleSourceMapping.MappingType.ROOM,
).values_list('local_id', flat=True)
)
expected_events = list(
self.mappings.filter(
mapping_type=ScheduleSourceMapping.MappingType.EVENT,
).values_list('local_id', flat=True)
)
expected_speakers = list(
self.mappings.filter(
mapping_type=ScheduleSourceMapping.MappingType.SPEAKER,
).values_list('local_id', flat=True)
)
def speaker_lookup(speaker_info: Dict[str, str]):
"""
Try to match the given speaker dict to a PlatformUser, if necessary creating a virtual one in the process.
Returns None if the speaker shall be skipped (explicitly, using ScheduleSourceMapping.skip=True).
Example:
```json
{
"id": 4711,
"guid": "c25334d0-9539-55e3-92b4-f559c384522b",
"name": "Hub Team",
"links": [
{
"url": "https://git.cccv.de/hub/hub",
"title": "Quellcode"
}
],
"biography": "Das Projekt-Team vom Hub, der Daten-Integrationsplattform von Congress & Camp.",
"avatar": "https://www.ccc.de/images/events.png",
"public_name": "Hub Team"
}
```
"""
# for the source id use the provided id, uuid or guid field (in order)
speaker_id = speaker_info.get('id') or speaker_info.get('uuid') or speaker_info.get('guid')
# sanity check: verify that required attributes are present
if not speaker_info:
raise ValueError('Missing required attribute in speaker_info: id/uuid/guid')
try:
action = self._load_dataitem(
activity=activity,
item=speaker_info,
item_source_id=speaker_id,
item_type='speaker',
expected_items=expected_speakers,
items=speakers,
from_dict_args={},
)
if action == 'skipped':
# if the speaker has been skipped throw it into the mapping table anyway
spk_mapping = self.mappings.get(mapping_type=ScheduleSourceMapping.MappingType.SPEAKER, source_id=speaker_id)
assert spk_mapping.skip
speakers[speaker_id] = spk_mapping.local_object
except Exception as err:
activity.append(
{
'action': 'error',
'type': 'speaker',
'source_id': speaker_id,
'local_id': None,
'message': str(err),
}
)
logging.exception('Import on ScheduleSource %s encountered exception on loading speaker "%s".', self.pk, speaker_id)
return speakers[speaker_id]
# first, load the rooms (as they're needed for events)
for r_id, r in data['rooms'].items():
try:
if replace_conference_slug_prefix and r.get('slug', '').startswith(replace_conference_slug_prefix):
r['slug'] = self.conference.slug + r['slug'][len(replace_conference_slug_prefix) :]
action = self._load_dataitem(
activity=activity,
item=r,
item_source_id=r_id,
item_type='room',
expected_items=expected_rooms,
items=rooms,
from_dict_args={
'allow_backend_roomtypes': False,
},
)
if action == 'skipped':
# if the room has been skipped throw it into the mapping table anyway
r_mapping = self.mappings.get(mapping_type=ScheduleSourceMapping.MappingType.ROOM, source_id=r_id)
assert r_mapping.skip
rooms[r_id] = r_mapping.local_object
except Exception as err:
activity.append(
{
'action': 'error',
'type': 'room',
'source_id': r_id,
'local_id': None,
'message': str(err),
}
)
logging.exception('Import on ScheduleSource %s encountered exception on loading room "%s".', self.pk, r_id)
# then load events
for e_id, e in data['events'].items():
try:
if replace_conference_slug_prefix and e.get('slug', '').startswith(replace_conference_slug_prefix):
e['slug'] = self.conference.slug + e['slug'][len(replace_conference_slug_prefix) :]
self._load_dataitem(
activity=activity,
item=e,
item_source_id=e_id,
item_type='event',
expected_items=expected_events,
items=events,
from_dict_args={
'allow_kind': self.assembly.is_official if self.assembly else False, # TODO: lookup assembly's room if not given
'allow_track': allow_track, # TODO
'room_lookup': lambda r_source_id: rooms.get(r_source_id),
'speaker_lookup': speaker_lookup,
},
)
except Exception as err:
activity.append(
{
'action': 'error',
'type': 'event',
'source_id': e_id,
'local_id': e.get('uuid', None),
'message': str(err),
}
)
logging.exception('Import on ScheduleSource %s encountered exception on loading event "%s".', self.pk, e_id)
# flag the non-loaded rooms as 'missing'
for room_id in expected_rooms:
activity.append(
{
'action': 'missing',
'type': 'room',
'source_id': None,
'local_id': str(room_id),
}
)
# flag the non-loaded events as 'missing'
for event_id in expected_events:
act = {
'action': 'missing',
'type': 'event',
'source_id': None,
'local_id': str(event_id),
}
# check if we should do something about the missing event
match missing_events:
case 'delete':
Event.objects.filter(pk=event_id).delete()
act['action'] = 'deleted'
case 'depublish':
Event.objects.filter(pk=event_id).update(is_public=False, room=None)
# Depublish is default, but python does not allow ['depublish', _]
case _:
Event.objects.filter(pk=event_id).update(is_public=False, room=None)
activity.append(act)
return activity
class ScheduleSourceMapping(models.Model):
class MappingType(models.TextChoices):
UNKNOWN = 'unknown', _('unknown')
ROOM = 'room', _('Room')
EVENT = 'event', _('Event')
EVENT_ATTACHMENT = 'event_attachment', _('EventAttachment')
SPEAKER = 'speaker', _('EventParticipant__type-speaker')
schedule_source = models.ForeignKey(ScheduleSource, related_name='mappings', on_delete=models.CASCADE)
mapping_type = models.CharField(max_length=20, choices=MappingType.choices)
source_id = models.CharField(max_length=200)
"""
An arbitrary identifier in the source system.
Not evaluated by the hub itself - it is specific to the ScheduleSupport implementation in use.
"""
local_id = models.UUIDField(blank=True, null=True)
"""
The local UUID of the mapped object.
See also: the local_object property
"""
skip = models.BooleanField(default=False)
"""
Indicates that the remote item shall be ignored.
"""
extra_data = models.JSONField(blank=True, null=True)
"""
Import-specific additional mapping information.
Internal use only, not to be presented to the user.
"""
def __init__(self, *args, **kwargs):
self._local_object = kwargs.pop('local_object', None)
super().__init__(*args, **kwargs)
def get_local_object(self):
"""
Fetches the local object based on the given mapping_type and local_id.
May throw a LocalObjectAccessViolation if the associated assembly does not match.
Consider using the local_object property which caches its result and works
for newly created mappings by ScheduleSource.get_or_create_mapping(), too.
"""
if self.mapping_type == self.MappingType.ROOM:
room = Room.objects.get(pk=self.local_id)
if room.assembly_id is not None:
if self.schedule_source.assembly_id is not None and room.assembly_id != self.schedule_source.assembly_id:
raise LocalObjectAccessViolation('Assembly of Room does not match.')
elif self.schedule_source.assembly_id is not None and room.conference_id != self.schedule_source.assembly.conference_id:
raise LocalObjectAccessViolation('Conference of Room does not match.')
return room
if self.mapping_type == self.MappingType.EVENT:
event = Event.objects.get(pk=self.local_id)
if self.schedule_source.assembly_id is not None and event.assembly_id != self.schedule_source.assembly_id:
raise LocalObjectAccessViolation('Assembly of Event does not match.')
return event
if self.mapping_type == self.MappingType.EVENT_ATTACHMENT:
attachment = EventAttachment.objects.prefetch_related('event').get(pk=self.local_id)
if self.schedule_source.assembly_id is not None and attachment.event.assembly_id != self.schedule_source.assembly_id:
raise LocalObjectAccessViolation('Assembly of EventAttachment does not match.')
return attachment
if self.mapping_type == self.MappingType.SPEAKER:
speaker = PlatformUser.objects.get(pk=self.local_id)
if not speaker.is_person:
raise LocalObjectAccessViolation("Referenced speaker's PlatformUser is not a person.")
return speaker
# we don't know about that mapping type, bail out
raise LocalObjectAccessViolation('Unknown mapping.')
@property
def local_object(self):
if self._local_object is None:
self._local_object = self.get_local_object()
return self._local_object
def __str__(self):
if self.skip:
return f'{{ {self.mapping_type} "{self.source_id}" skip }}'
return f'{{ {self.mapping_type} "{self.source_id}" -> "{self.local_id}" }}'
class ScheduleSourceImport(models.Model):
class State(models.TextChoices):
UNKNOWN = 'unknown', _('ScheduleSourceImport__state-unknown')
PREPARED = 'prepared', _('ScheduleSourceImport__state-prepared')
STARTED = 'started', _('ScheduleSourceImport__state-started')
CONNECTION_ERROR = 'conn_failed', _('ScheduleSourceImport__state-conn_failed')
FETCHED = 'fetched', _('ScheduleSourceImport__state-fetched')
IMPORTING = 'importing', _('ScheduleSourceImport__state-importing')
IMPORT_ERROR = 'import_failed', _('ScheduleSourceImport__state-import_failed')
TIMED_OUT = 'timeout', _('ScheduleSourceImport__state-timeout')
COMPLETED = 'completed', _('ScheduleSourceImport__state-completed')
WARNING = 'warning', _('ScheduleSourceImport__state-warning')
ERROR = 'error', _('ScheduleSourceImport__state-error')
class Meta:
ordering = ['-start', '-end']
ERROR_STATES = [State.CONNECTION_ERROR, State.IMPORT_ERROR, State.ERROR]
SUCCESS_STATES = [State.COMPLETED, State.WARNING]
WORK_STATES = [State.STARTED, State.IMPORTING]
schedule_source = models.ForeignKey(ScheduleSource, related_name='imports', on_delete=models.CASCADE)
state = models.CharField(max_length=20, choices=State.choices)
summary = models.CharField(max_length=200, blank=True, null=True)
start = models.DateTimeField(auto_now_add=True)
"""Timestamp of import start"""
end = models.DateTimeField(null=True)
"""Timestamp of import end (successful or not)"""
data = models.JSONField(blank=True, null=True)
"""
The raw import data as fetched by the ScheduleSupport implementation.
"""
@property
def is_failed(self):
return self.state in self.ERROR_STATES
@property
def is_inprogress(self):
return self.state in self.WORK_STATES
@property
def is_success(self):
return self.state in self.SUCCESS_STATES
@property
def is_beyond_deadline(self):
deadline = self.start + self.schedule_source.import_timeout
return timezone.now() < deadline
@property
def text_color_class(self):
if self.state == ScheduleSourceImport.State.WARNING:
return 'text-warning'
if self.is_success:
return 'text-success'
if self.is_failed:
return 'text-danger'
if self.is_inprogress:
return 'text-primary'
return 'text-muted'
@property
def errors(self):
if not self.data:
return None
errors = [x for x in self.data.get('_activity', []) if x['action'] == 'error']
return errors
def do_import(self):
self.refresh_from_db()
assert self.state == self.State.PREPARED
import_type = self.schedule_source.import_type
if not ScheduleTypeManager.has_type(import_type):
logger.error('[job %s] unknown import_type "%s"', self.pk, import_type)
self.state = self.State.ERROR
self.summary = f'ERROR: unknown import type "{ import_type }"'
self.end = timezone.now()
self.save(update_fields=['state', 'summary', 'end'])
return False
self.start = timezone.now()
self.end = None
self.state = self.State.STARTED
self.save(update_fields=['start', 'state'])
logger.info('[job %s] starting import', self.pk)
try:
configuration_overwrite = None
data = ScheduleTypeManager.fetch(self, configuration_overwrite=configuration_overwrite)
except Exception as err:
logger.exception('[job %s] fetch failed: %s', self.pk, err)
self.state = self.State.ERROR
self.summary = f'FETCH ERROR: { err }'[:200]
self.end = timezone.now()
self.save(update_fields=['state', 'summary', 'end'])
return False
# update state, data has been set by .fetch() already
self.state = self.State.IMPORTING
self.save(update_fields=['state'])
errors = []
try:
if len(data) == 1 and data.get('_ERROR'):
raise Exception(data['_ERROR'])
activity = self.schedule_source.load_data(data)
errors = [x for x in activity if x['action'] == 'error']
msgs = []
self.data['_activity'] = activity
if errors:
# create list of unique errors for summary
msgs = list({x['message'].split('\n')[0] for x in errors})
stats = (
', '.join(
(t + '=' + str(sum(1 for x in activity if x['action'] == t)))
for t in ['added', 'changed', 'seen', 'deleted', 'missing', 'error', 'skipped']
)
+ ' \n'
+ ' \n'.join(msgs)
)
self.summary = f"{data.get('version') or ''}\nDONE: {stats}"[:200]
if len(errors) > len(activity) / 2:
raise Exception('Too many errors, aborting import: ' + stats)
self.save(update_fields=['data', 'summary'])
except Exception as err:
logger.exception('[job %s] import failed: %s', self.pk, err)
self.state = self.State.IMPORT_ERROR
self.summary = f'IMPORT ERROR: { err }'[:200]
self.end = timezone.now()
self.save(update_fields=['state', 'summary', 'end'])
return False
# update state, data has been set by .fetch() already
self.state = self.State.WARNING if errors else self.State.COMPLETED
self.end = timezone.now()
self.save(update_fields=['state', 'end'])
logger.info('[job %s] import complete (took %ss): %s', self.pk, self.end - self.start, self.summary)
self.schedule_source.last_import = timezone.now()
self.schedule_source.last_import_version = data.get('version')
self.schedule_source.save(update_fields=['last_import'])
return True