Select Git revision
Forked from
uffd / uffd
Source project has a limited visibility.
utils.py 4.78 KiB
from datetime import datetime
from html import escape
from random import choices
# from shutil import copyfileobj
from string import digits
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PublicKey
from pydantic import UUID4
from sqlalchemy.orm import Session
from . import models, schemas
from .config import settings
# from tempfile import SpooledTemporaryFile
# helpers
def generate_tag():
return "".join(choices(digits, k=settings.tag_length))
def verify_signature(delivery: schemas.Delivery, data: str, signature: str):
public_key = Ed448PublicKey.from_public_bytes(bytes.fromhex(delivery.verification))
try:
public_key.verify(bytes.fromhex(signature), bytes(str(data), "utf-8"))
except InvalidSignature:
return False
return True
# main functions
def get_image_by_uuid(db: Session, image_uuid: UUID4):
return db.get(models.Image, image_uuid)
def get_item_by_uuid(db: Session, item_uuid: UUID4):
return db.get(models.Item, item_uuid)
def get_storage_by_name(db: Session, storage_name: str):
storage = db.get(models.Storage, storage_name)
storage.items = [
item for item in storage.items if item.deployed_at == None # noqa: E711
]
return db.get(models.Storage, storage_name)
def get_storages(db: Session):
storages = db.query(models.Storage).all()
for storage in storages:
storage.items = [
item for item in storage.items if item.deployed_at == None # noqa: E711
]
return storages
def get_delivery_by_uuid(db: Session, delivery_uuid: UUID4):
delivery = db.get(models.Delivery, delivery_uuid)
delivery.amount = len(delivery.items)
delivery.items = [
item for item in delivery.items if item.deployed_at == None # noqa: E711
]
return db.get(models.Delivery, delivery_uuid)
def get_delivery_by_tag(db: Session, delivery_tag: str):
delivery = (
db.query(models.Delivery).filter(models.Delivery.tag == delivery_tag).first()
)
delivery.amount = len(delivery.items)
delivery.items = [
item for item in delivery.items if item.deployed_at == None # noqa: E711
]
return db.query(models.Delivery).filter(models.Delivery.tag == delivery_tag).first()
def prepare_delivery(db: Session, verification: str):
tag = generate_tag()
while db.query(models.Delivery).filter(models.Delivery.tag == tag).count() > 0:
tag = generate_tag()
delivery = models.Delivery(verification=verification, tag=tag)
db.add(delivery)
db.commit()
db.refresh(delivery)
return delivery
def update_delivery_data(
db: Session, delivery_uuid: UUID4, update_data: schemas.DeliveryUpdate
):
delivery = db.get(models.Delivery, delivery_uuid)
if update_data.addressee:
delivery.addressee = escape(update_data.addressee)
if update_data.team:
delivery.team = escape(update_data.team)
db.commit()
db.refresh(delivery)
delivery.amount = len(delivery.items)
delivery.items = [
item for item in delivery.items if item.deployed_at == None # noqa: E711
]
return delivery
def add_item_for_delivery_at_storage(
db: Session, delivery_uuid: UUID4, storage_name: str
):
delivery = db.get(models.Delivery, delivery_uuid)
storage = db.get(models.Storage, storage_name)
item = models.Item(storage_name=storage_name, stored_at=storage, part_of=delivery)
db.add(item)
db.commit()
db.refresh(item)
return item
def deploy_item(db: Session, item_uuid: UUID4):
item = db.get(models.Item, item_uuid)
item.deployed_at = datetime.now()
db.commit()
db.refresh(item)
return item
# def add_item_with_image(db: Session, image: SpooledTemporaryFile):
# db_item = models.Delivery()
# db.add(db_item)
# db.commit()
# db_image = models.Image(item_uuid=db_item.uuid)
# db.add(db_image)
# db.commit()
# db.refresh(db_image)
# print(db_item.uuid)
# print(db_image.uuid)
# with open(f"./images/{ db_image.uuid }", "wb") as destination:
# try:
# copyfileobj(image.file, destination)
# finally:
# image.file.close
# return db_item
#
#
#
# def update_item(db: Session, item: schemas.Delivery, data: schemas.ItemUpdate):
# if data.addressee:
# item.addressee = escape(data.addressee)
# if data.team:
# item.team = escape(data.team)
# if data.amount:
# item.amount = data.amount
# db.commit()
# db.refresh(item)
# return item
#
#
# def receive_item_with_image(db: Session, item: schemas.ItemCreateByImageAtStorage):
# new_item = models.Delivery(storage_uuid=item.storage_uuid)
# db.add(new_item)
# db.commit()
# db.refresh(new_item)
# new_image = models.Image(item_uuid=new_item.uuid)
# db.add(new_image)
# db.commit()