from datetime import datetime, timedelta from uuid import UUID from fastapi import Depends, FastAPI, HTTPException, Request, UploadFile, status from fastapi.middleware.cors import CORSMiddleware from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from itsdangerous.serializer import Serializer from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.errors import RateLimitExceeded from slowapi.util import get_remote_address from sqlalchemy.orm import Session from . import schemas, utils from .config import settings from .database import SessionLocal, create_database create_database() app = FastAPI() # CORS handling origins = [settings.customer_url, settings.worker_url] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Rate Limiting for some endpoints limiter = Limiter(key_func=get_remote_address) app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) # Authentication setup oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") oauth2_tokener = Serializer(settings.signing_key) # DB Dependency def get_db(): db = SessionLocal() try: yield db finally: db.close() def check_token(token: str): if datetime.fromtimestamp(oauth2_tokener.loads(token)) < datetime.now(): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", headers={"WWW-Authenticate": "Bearer"}, ) # Routes @app.post("/item/prepare", response_model=schemas.Item) @limiter.limit("2/minute") def add_item( request: Request, item: schemas.ItemCreatePrepareShipping, db: Session = Depends(get_db), ): return utils.prepare_item_shipping(db, item) @app.post("/item/register", response_model=schemas.Item) def add_item_with_image( image: UploadFile, token: str = Depends(oauth2_scheme), db: Session = Depends(get_db), ): check_token(token) print(image.file) return utils.add_item_with_image(db, image) @app.post("/item/update/{item_uuid}", response_model=schemas.Item) def update_item( item_uuid: str, data: schemas.ItemUpdate, db: Session = Depends(get_db) ): item = utils.get_item_by_uuid(db, UUID(item_uuid)) if not item: raise HTTPException(status_code=404, detail="Item not found") return utils.update_item(db, item, data) @app.get("/item/{item_uuid}", response_model=schemas.Item) def get_item(item_uuid: str, db: Session = Depends(get_db)): item = utils.get_item_by_uuid(db, UUID(item_uuid)) if not item: raise HTTPException(status_code=404, detail="Item not found") return item @app.get("/items", response_model=list[schemas.Item]) def get_items(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)): check_token(token) return utils.get_stored_items(db) @app.get("/tag/{tag}", response_model=schemas.Item) def get_item_by_tag(tag: str, db: Session = Depends(get_db)): item = utils.get_item_by_tag(db, tag) if not item: raise HTTPException(status_code=404, detail="Item not found") return item @app.get("/storages", response_model=list[schemas.Storage]) def list_storages(db: Session = Depends(get_db)): return utils.get_storages(db) @app.post("/checkin", response_model=schemas.Item) def checkin_item_by_uuid( checkin: schemas.ItemCheckin, token: str = Depends(oauth2_scheme), db: Session = Depends(get_db), ): check_token(token) item = utils.get_item_by_uuid(db, UUID(checkin.item_uuid)) if item is None: raise HTTPException(status_code=404, detail="Item not found") storage = utils.get_storage(db, checkin.storage_name) if storage is None: raise HTTPException(status_code=404, detail="Storage not found") return utils.receive_item(db, item, storage) @app.post("/token") def verify_supporter(form_data: OAuth2PasswordRequestForm = Depends()): if form_data.password != settings.shared_secret: raise HTTPException(status_code=400, detail="Incorrect username or password") return { "access_token": oauth2_tokener.dumps( (datetime.now() + timedelta(minutes=settings.token_lifetime)).timestamp() ), "token_type": "bearer", }