Skip to content
Snippets Groups Projects
Commit c3f07876 authored by Julian's avatar Julian
Browse files

Incremental sync protoype

parent 6337c591
No related tags found
No related merge requests found
from flask import current_app
from flask_babel import get_locale
from sqlalchemy import Column, Integer, String, ForeignKey, Boolean
from sqlalchemy import Column, Integer, String, ForeignKey, Boolean, DateTime
from sqlalchemy.orm import relationship
from uffd.database import db
......@@ -46,6 +46,11 @@ class ServiceUser(db.Model):
user_id = Column(Integer(), ForeignKey('user.id', onupdate='CASCADE', ondelete='CASCADE'), nullable=False)
user = relationship('User', viewonly=True)
needs_sync = Column(Boolean())
last_sync = Column(DateTime())
sync_token = Column(String(32))
sync_started = Column(DateTime())
@property
def has_access(self):
return not self.service.limit_access or self.service.access_group in self.user.groups
......@@ -104,6 +109,27 @@ class ServiceUser(db.Model):
)
return query.filter(db.and_(db.not_(remailer_enabled_expr), AliasedUser.mail == email))
@db.event.listens_for(db.Session, 'after_flush') # pylint: disable=no-member
def mark_service_users_changed(session, flush_context): # pylint: disable=unused-argument
user_ids = set()
service_ids = set()
for obj in session.dirty:
if isinstance(obj, User):
user_ids.add(obj.id)
# We modify ServiceUser during the sync API call, so if we want to react
# to ServiceUser changes, we must be more selective
#elif isinstance(obj, ServiceUser):
# user_ids.add(obj.user_id)
elif isinstance(obj, Service):
service_ids.add(obj.id)
ServiceUser.query.filter(db.or_(
ServiceUser.service_id.in_(service_ids),
ServiceUser.user_id.in_(user_ids),
)).update(dict(
needs_sync=True,
sync_token=None,
), synchronize_session=False)
@db.event.listens_for(db.Session, 'after_flush') # pylint: disable=no-member
def create_service_users(session, flush_context): # pylint: disable=unused-argument
# pylint completely fails to understand SQLAlchemy's query functions
......
import functools
import datetime
from secrets import token_hex
from flask import Blueprint, jsonify, request, abort
......@@ -158,3 +160,50 @@ def resolve_remailer():
if not service_user:
return jsonify(address=None)
return jsonify(address=service_user.real_email)
@bp.route('/sync', methods=['GET'])
@apikey_required('users')
def start_sync():
sync_token = token_hex(16)
service_users = ServiceUser.query.filter(
ServiceUser.service == request.api_client.service,
db.or_(
ServiceUser.needs_sync == True,
ServiceUser.needs_sync == None,
ServiceUser.last_sync < datetime.datetime.utcnow() - datetime.timedelta(hours=24),
),
db.or_(
ServiceUser.sync_token == None,
ServiceUser.sync_started < datetime.datetime.utcnow() - datetime.timedelta(seconds=10),
)
).order_by(ServiceUser.needs_sync, ServiceUser.last_sync.desc()).limit(10).with_for_update().all()
for service_user in service_users:
service_user.sync_started = datetime.datetime.utcnow()
service_user.sync_token = sync_token
db.session.commit()
return jsonify([
{
'sync_token': f'{sync_token}-{service_user.user_id}',
'data': generate_user_dict(service_user),
}
for service_user in service_users
])
@bp.route('/sync', methods=['POST'])
@apikey_required('users')
def finish_sync():
for result in request.json:
sync_token, user_id = result['sync_token'].split('-', 1)
query = ServiceUser.query.filter_by(
service=request.api_client.service,
user_id=user_id,
sync_token=sync_token,
)
if result['status'] == 'ok':
query.update(dict(
sync_token=None,
last_sync=datetime.datetime.utcnow(),
needs_sync=False,
))
db.session.commit()
return 'OK', 200
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment