Skip to content
Snippets Groups Projects
Select Git revision
  • 26c70d61a0afca299e242c6cb4b716b08c3f1625
  • main default protected
  • renovate/solid_queue-1.x-lockfile
  • renovate/selenium-webdriver-4.x-lockfile
  • renovate/icalendar-2.x-lockfile
  • renovate/debug-1.x-lockfile
  • renovate/turbo-rails-2.x-lockfile
  • renovate/gcr.io-kaniko-project-executor-1.x
  • renovate/ruby
  • eh22 protected
  • update-rubocop
11 results

revision_set.rb

Blame
  • base.py 9.70 KiB
    import logging
    from abc import ABCMeta, abstractmethod
    from datetime import timedelta
    
    from django.conf import settings
    from django.utils import timezone
    from django.utils.module_loading import import_string
    
    
    def filter_additional_data(data: dict, computed_fields: dict | None = None, others=[]) -> dict:
        computed_fields = computed_fields or {}
        return {
            k: v for k, v in data.items() if (v and k not in ['guid', 'room', 'start', 'date', 'duration', 'title', 'abstract', 'description', 'language', *others])
        } | computed_fields
    
    
    def schedule_time_to_timedelta(s: str) -> timedelta:
        if ':' in s:
            hours, minutes = s.split(':')
        else:
            hours, minutes = 0, s
    
        return timedelta(hours=int(hours), minutes=int(minutes))
    
    
    class BaseScheduleSupport(metaclass=ABCMeta):
        identifier = None
        """
        An identifier which acts like a primary key for the schedule support class.
        """
    
        readonly = None
        """
        Flag to indicate if this support class supports writing changes back to the remote side.
        """
    
        configuration_fields = None
        """
        List of configuration fields presented to the user.
        The fields are specified as a dict with field identifiers as key and tuples consisting of
        (type (one of string, int, bool), default value, mandatory, translation text) as value.
    
        The user-configured values are provided to __init__ as keyword arguments.
        """
    
        last_import_version = None
    
        def __init__(self, schedule_source, configuration_overwrite=None):
            assert schedule_source is not None
            self._schedule_source = schedule_source
            self._remote_url = schedule_source.import_url
            self._configuration = schedule_source.import_configuration or {}
            if configuration_overwrite is not None:
                self._configuration.update(configuration_overwrite)
    
        @property
        def remote_url(self):
            return self._remote_url
    
        @property
        def schedule_source(self):
            return self._schedule_source
    
        def conf_default(self, key):
            fld = self.configuration_fields[key]
            return fld[1]
    
        def conf_value(self, key):
            # fetch field's default first as this also checks if the field does exist at all
            default = self.conf_default(key)
    
            # check if the field got configured, if yes return that value
            if key in self._configuration:
                return self._configuration[key]
    
            # not configured explicitly, return default
            return default
    
        def sanitycheck_url(self):
            """
            Raises an exception if an invalid protocol is supplied to the URL.
            This allows the file:// protocol if the SCHEDULES_SUPPORT_FILE_PROTOCOL configuration flag has been set.
            """
            url = self.remote_url.lower()
    
            # we accept regular HTTP protocols
            if url.startswith(('http://', 'https://')):
                return
    
            # local file inclusion is allowed only if configured
            if url.startswith('file://'):
                if settings.SCHEDULES_SUPPORT_FILE_PROTOCOL:
                    return
                raise Exception('File Protocol links are not allowed.')
    
            # everything else causes an exception
            raise Exception('Unsupported protocol in URL.')
    
        @abstractmethod
        def ready(self):
            """
            This method checks if the provided URL and configuration are usable,
            i.e. the remote endpoint can be reached and queried via them.
    
            There is no return value, if something is wrong, an exception is raised
            which's message will be presented to the user.
            """
            raise NotImplementedError('ready() was not overriden for ScheduleSupport')
    
        @abstractmethod
        def fetch(self):
            """
            This method is the workhorse of the schedule support module:
            its job is to query upstream for the current set of data.
    
            It shall return a dictionary with keys 'rooms' and 'events',
            each containing a dictionary with entries mapping a source id
            to a dictionary which can be understood by Room.from_dict()
            and Event.from_dict() respectively.
    
            The hub will update events it already knows by the source id:
            all events need to have an unique but stable identifier, i.e.
            if the name of the event changes the identifier must not change!
            """
            raise NotImplementedError('fetch() was not overriden for ScheduleSupport')
    
        def push(self, data):
            """
            This method is used by the hub to push updated room/event data
            to the source (if the readonly flag is not set).
            The data argument will contain a dictionary exactly like it is
            returned by fetch() - using the output from Room's and Event's
            .to_dict() function.
            """
            if self.readonly:
                raise Exception('This ScheduleExport is marked readonly, push() not supported.')
    
            raise NotImplementedError('push(data) was not overriden for ScheduleSupport')
    
    
    class _ScheduleTypeManager:
        def __init__(self):
            self.types = {}
            self._initialize_from_settings()
            self._logger = logging.getLogger(__name__)
    
        def _check_class_for_sanity(self, klass):
            assert getattr(klass, 'identifier', None) is not None, 'Identifier must be set.'
            assert getattr(klass, 'readonly', None) in [True, False], 'Readonly indication must be a boolean.'
            flds = getattr(klass, 'configuration_fields', None)
    
            if flds is not None:
                if not isinstance(flds, dict):
                    raise AssertionError('Configuration fields is not a dict.')
    
                for fld_id, fld_data in flds.items():
                    if not isinstance(fld_data, tuple) or len(fld_data) != 4:
                        raise AssertionError(f'Configuration field "{fld_id}" is not a tuple (type, default-value, mandatory, translation).')
    
        def _initialize_from_settings(self):
            # SCHEDULE_SUPPORT should be a list of class names, let's try to load them
            for line in settings.SCHEDULE_SUPPORT:
                try:
                    klass = import_string(line)
                    klass_ident = getattr(klass, 'identifier')
    
                except ImportError as err:
                    raise ImportError(f'Failed to find/import schedule support "{line}".') from err
    
                except AttributeError as err:
                    raise ImportError(f'Failed to find identifier of schedule support "{line}".') from err
    
                if klass_ident in self.types:
                    if self.types[klass_ident] == klass:
                        raise RuntimeWarning(f'Duplicate import of {line} for schedule support "{klass_ident}".')
                        continue
    
                    raise ImportError(f'The identifier "{klass_ident}" of "{line}" is already in use by "{self.types[klass_ident]}".')
    
                self.register_class(klass_ident, klass)
    
        def register_class(self, klass_ident, klass):
            try:
                # check klass sanity
                self._check_class_for_sanity(klass)
    
            except AssertionError as err:
                raise ImportError(f'Sanity check for "{klass_ident}" failed: {err}')
    
            # all is well, store the association
            self.types[klass_ident] = klass
    
        def unregister_class(self, klass_ident):
            if klass_ident in self.types:
                del self.types[klass_ident]
    
        def has_type(self, import_type):
            return import_type in self.types
    
        def fetch(self, job, configuration_overwrite=None):
            """
            Fetch the data from the schedule source associated with the given import job.
    
            Instanciates an instance of the given target's import_type and
            calls fetch() on it.
    
            Returns the data fetched.
            """
            assert job is not None
            assert job.state == job.State.STARTED
            self._logger.debug('[job %s] fetching data from source %s', job.pk, job.schedule_source)
    
            klass = self.types.get(job.schedule_source.import_type)
            if klass is None:
                msg = f'Schedule type "{job.schedule_source.import_type}" is unknown.'
                self._logger.error(msg)
                raise Exception(msg)
    
            self._logger.debug('Instanciating %s for source %s ...', klass, job.schedule_source)
            instance = klass(
                schedule_source=job.schedule_source,
                configuration_overwrite=configuration_overwrite,
            )
            self._logger.debug('Instanciated: %s', instance)
    
            try:
                instance.sanitycheck_url()
                instance.ready()
                self._logger.debug('[job %s] source %s instance is ready: %s', job.pk, job.schedule_source, instance)
    
            except Exception as err:
                self._logger.error('[job %s] instance signals not being ready: (%s) %s', job.pk, type(err), err)
                job.state = job.State.CONNECTION_ERROR
                job.data = {'_ERROR': str(err)}
                job.end = timezone.now()
                job.save(update_fields=['state', 'data', 'end'])
                return job.data
    
            try:
                job.state = job.State.STARTED
                job.save(update_fields=['state'])
    
                self._logger.debug('[job %s] fetch started', job.pk)
                instance.sanitycheck_url()
                data = instance.fetch()
                self._logger.debug('[job %s] fetch complete', job.pk)
    
                job.data = data
                job.state = job.State.FETCHED
    
            except Exception as err:
                self._logger.error('[job %s] fetch failed: (%s) %s', job.pk, type(err), err)
                job.state = job.State.CONNECTION_ERROR
                job.data = {'_ERROR': str(err)}
                raise err
    
            finally:
                job.end = timezone.now()
                job.save(update_fields=['state', 'data', 'end'])
                self._logger.info('[job %s] fetch took %ss', job.pk, job.end - job.start)
    
            return job.data
    
    
    # "singleton" instance of the manager
    ScheduleTypeManager = _ScheduleTypeManager()