Skip to content
Snippets Groups Projects
Select Git revision
19 results Searching

check_migrations.py

Blame
  • Forked from uffd / uffd
    Source project has a limited visibility.
    utils.py 3.21 KiB
    from datetime import datetime
    from html import escape
    from secrets import token_hex
    from shutil import copyfileobj
    from tempfile import SpooledTemporaryFile
    
    from cryptography.exceptions import InvalidSignature
    from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PublicKey
    from pydantic import UUID4
    from sqlalchemy.orm import Session
    
    from . import models, schemas
    
    
    def get_item_by_uuid(db: Session, item_uuid: UUID4):
        return db.get(models.Item, item_uuid)
    
    
    def get_item_by_tag(db: Session, item_tag: str):
        return db.query(models.Item).filter(models.Item.tag == item_tag).first()
    
    
    def get_items_for_storage(db: Session, storage_name: str):
        return db.query(models.Storage).get(models.Storage.name == storage_name).items
    
    
    def get_storage(db: Session, storage_name: str):
        return db.get(models.Storage, storage_name)
    
    
    def get_stored_items(db: Session):
        return (
            db.query(models.Item)
            .filter(models.Item.received_at != None)  # noqa: E711
            .filter(models.Item.deployed_at == None)  # noqa: E711
            .order_by(models.Item.storage, models.Item.addressee)
            .all()
        )
    
    
    def get_storages(db: Session):
        return db.query(models.Storage).all()
    
    
    def prepare_item_shipping(db: Session, item: schemas.ItemCreatePrepareShipping):
        # we want the tag to be unique.
        # FIXME: this may never finish.
        tag = token_hex(3)
        while db.query(models.Item).filter(models.Item.tag == tag).count() > 0:
            tag = token_hex(3)
        new_item = models.Item(**item.dict(), tag=tag)
        db.add(new_item)
        db.commit()
        db.refresh(new_item)
        return new_item
    
    
    def add_item_with_image(db: Session, image: SpooledTemporaryFile):
        db_item = models.Item()
        db.add(db_item)
        db.commit()
        db_image = models.Image(item_uuid=db_item.uuid)
        db.add(db_image)
        db.commit()
        db.refresh(db_image)
        print(db_item.uuid)
        print(db_image.uuid)
        with open(f"./images/{ db_image.uuid }", "wb") as destination:
            try:
                copyfileobj(image.file, destination)
            finally:
                image.file.close
        return db_item
    
    
    def update_item(db: Session, item: schemas.Item, data: schemas.ItemUpdate):
        public_key = Ed448PublicKey.from_public_bytes(bytes.fromhex(item.verification))
        verify = ""
        if data.addressee:
            verify += data.addressee
            item.addressee = escape(data.addressee)
        if data.team:
            verify += data.team
            item.team = escape(data.team)
        if data.amount:
            verify += str(data.amount)
            item.amount = data.amount
        try:
            public_key.verify(bytes.fromhex(data.signature), bytes(verify, "utf-8"))
        except InvalidSignature:
            return None
        db.commit()
        db.refresh(item)
        return item
    
    
    def receive_item_with_image(db: Session, item: schemas.ItemCreateByImageAtStorage):
        new_item = models.Item(storage_uuid=item.storage_uuid)
        db.add(new_item)
        db.commit()
        db.refresh(new_item)
        new_image = models.Image(item_uuid=new_item.uuid)
        db.add(new_image)
        db.commit()
        return new_item
    
    
    def receive_item(db: Session, item: schemas.Item, storage: schemas.Storage):
        item.received_at = datetime.now()
        item.storage = storage.name
        db.commit()
        db.refresh(item)
        return item