Skip to content
Snippets Groups Projects
Commit 5038d148 authored by Julian's avatar Julian
Browse files

Incremental sync protoype

parent 409d7e66
No related branches found
No related tags found
No related merge requests found
......@@ -2,7 +2,7 @@ import enum
from flask import current_app
from flask_babel import get_locale
from sqlalchemy import Column, Integer, String, ForeignKey, Boolean, Enum
from sqlalchemy import Column, Integer, String, ForeignKey, Boolean, Enum, DateTime
from sqlalchemy.orm import relationship, validates
from uffd.database import db
......@@ -55,6 +55,11 @@ class ServiceUser(db.Model):
user_id = Column(Integer(), ForeignKey('user.id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
user = relationship('User', viewonly=True)
needs_sync = Column(Boolean())
last_sync = Column(DateTime())
sync_token = Column(String(32))
sync_started = Column(DateTime())
@property
def has_access(self):
return not self.service.limit_access or self.service.access_group in self.user.groups
......@@ -168,6 +173,27 @@ class ServiceUser(db.Model):
)
return query.filter(db.and_(db.not_(remailer_enabled), real_email_matches))
@db.event.listens_for(db.Session, 'after_flush') # pylint: disable=no-member
def mark_service_users_changed(session, flush_context): # pylint: disable=unused-argument
user_ids = set()
service_ids = set()
for obj in session.dirty:
if isinstance(obj, User):
user_ids.add(obj.id)
# We modify ServiceUser during the sync API call, so if we want to react
# to ServiceUser changes, we must be more selective
#elif isinstance(obj, ServiceUser):
# user_ids.add(obj.user_id)
elif isinstance(obj, Service):
service_ids.add(obj.id)
ServiceUser.query.filter(db.or_(
ServiceUser.service_id.in_(service_ids),
ServiceUser.user_id.in_(user_ids),
)).update(dict(
needs_sync=True,
sync_token=None,
), synchronize_session=False)
@db.event.listens_for(db.Session, 'after_flush') # pylint: disable=no-member
def create_service_users(session, flush_context): # pylint: disable=unused-argument
# pylint completely fails to understand SQLAlchemy's query functions
......
import functools
import datetime
from secrets import token_hex
from flask import Blueprint, jsonify, request, abort, Response
......@@ -222,3 +224,50 @@ def prometheus_metrics():
registry.register(PLATFORM_COLLECTOR)
registry.register(UffdCollector())
return Response(response=generate_latest(registry=registry),content_type=CONTENT_TYPE_LATEST)
@bp.route('/sync', methods=['GET'])
@apikey_required('users')
def start_sync():
sync_token = token_hex(16)
service_users = ServiceUser.query.filter(
ServiceUser.service == request.api_client.service,
db.or_(
ServiceUser.needs_sync == True,
ServiceUser.needs_sync == None,
ServiceUser.last_sync < datetime.datetime.utcnow() - datetime.timedelta(hours=24),
),
db.or_(
ServiceUser.sync_token == None,
ServiceUser.sync_started < datetime.datetime.utcnow() - datetime.timedelta(seconds=10),
)
).order_by(ServiceUser.needs_sync, ServiceUser.last_sync.desc()).limit(10).with_for_update().all()
for service_user in service_users:
service_user.sync_started = datetime.datetime.utcnow()
service_user.sync_token = sync_token
db.session.commit()
return jsonify([
{
'sync_token': f'{sync_token}-{service_user.user_id}',
'data': generate_user_dict(service_user),
}
for service_user in service_users
])
@bp.route('/sync', methods=['POST'])
@apikey_required('users')
def finish_sync():
for result in request.json:
sync_token, user_id = result['sync_token'].split('-', 1)
query = ServiceUser.query.filter_by(
service=request.api_client.service,
user_id=user_id,
sync_token=sync_token,
)
if result['status'] == 'ok':
query.update(dict(
sync_token=None,
last_sync=datetime.datetime.utcnow(),
needs_sync=False,
))
db.session.commit()
return 'OK', 200
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment