from datetime import datetime from html import escape from secrets import token_hex from shutil import copyfileobj from tempfile import SpooledTemporaryFile from cryptography.exceptions import InvalidSignature from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PublicKey from pydantic import UUID4 from sqlalchemy.orm import Session from . import models, schemas def get_item_by_uuid(db: Session, item_uuid: UUID4): return db.get(models.Item, item_uuid) def get_item_by_tag(db: Session, item_tag: str): return db.query(models.Item).filter(models.Item.tag == item_tag).first() def get_items_for_storage(db: Session, storage_name: str): return db.query(models.Storage).get(models.Storage.name == storage_name).items def get_storage(db: Session, storage_name: str): return db.get(models.Storage, storage_name) def get_stored_items(db: Session): return ( db.query(models.Item) .filter(models.Item.received_at != None) # noqa: E711 .filter(models.Item.deployed_at == None) # noqa: E711 .order_by(models.Item.storage, models.Item.addressee) .all() ) def get_storages(db: Session): return db.query(models.Storage).all() def prepare_item_shipping(db: Session, item: schemas.ItemCreatePrepareShipping): # we want the tag to be unique. # FIXME: this may never finish. tag = token_hex(3) while db.query(models.Item).filter(models.Item.tag == tag).count() > 0: tag = token_hex(3) new_item = models.Item(**item.dict(), tag=tag) db.add(new_item) db.commit() db.refresh(new_item) return new_item def add_item_with_image(db: Session, image: SpooledTemporaryFile): db_item = models.Item() db.add(db_item) db.commit() db_image = models.Image(item_uuid=db_item.uuid) db.add(db_image) db.commit() db.refresh(db_image) print(db_item.uuid) print(db_image.uuid) with open(f"./images/{ db_image.uuid }", "wb") as destination: try: copyfileobj(image.file, destination) finally: image.file.close return db_item def verify_signature(item: str, signature: str): public_key = Ed448PublicKey.from_public_bytes(bytes.fromhex(item.verification)) print(str(item.uuid)) print(signature) try: public_key.verify(bytes.fromhex(signature), bytes(str(item.uuid), "utf-8")) except InvalidSignature: return False return True def update_item(db: Session, item: schemas.Item, data: schemas.ItemUpdate): if data.addressee: item.addressee = escape(data.addressee) if data.team: item.team = escape(data.team) if data.amount: item.amount = data.amount db.commit() db.refresh(item) return item def receive_item_with_image(db: Session, item: schemas.ItemCreateByImageAtStorage): new_item = models.Item(storage_uuid=item.storage_uuid) db.add(new_item) db.commit() db.refresh(new_item) new_image = models.Image(item_uuid=new_item.uuid) db.add(new_image) db.commit() return new_item def receive_item(db: Session, item: schemas.Item, storage: schemas.Storage): item.received_at = datetime.now() item.deployed_at = None item.storage = storage.name db.commit() db.refresh(item) return item def deliver_item(db: Session, item: schemas.Item): item.deployed_at = datetime.now() db.commit() db.refresh(item) return item