Select Git revision
uffd-socketmap@.socket
main.py 4.64 KiB
from datetime import datetime, timedelta
from uuid import UUID
from fastapi import Depends, FastAPI, HTTPException, Request, UploadFile, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from itsdangerous.serializer import Serializer
from itsdangerous import BadSignature
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
from slowapi.util import get_remote_address
from sqlalchemy.orm import Session
from . import schemas, utils
from .config import settings
from .database import SessionLocal, create_database
create_database()
app = FastAPI()
# CORS handling
origins = [settings.customer_url, settings.worker_url]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Rate Limiting for some endpoints
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# Authentication setup
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
oauth2_tokener = Serializer(settings.signing_key)
# DB Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
def check_token(token: str):
try:
timestamp = oauth2_tokener.loads(token)
if datetime.fromtimestamp(timestamp) > datetime.now():
return # success
except BadSignature:
pass
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Routes
@app.post("/item/prepare", response_model=schemas.Item)
@limiter.limit("2/minute")
def add_item(
request: Request,
item: schemas.ItemCreatePrepareShipping,
db: Session = Depends(get_db),
):
return utils.prepare_item_shipping(db, item)
@app.post("/item/register", response_model=schemas.Item)
def add_item_with_image(
image: UploadFile,
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db),
):
check_token(token)
print(image.file)
return utils.add_item_with_image(db, image)
@app.post("/item/update/{item_uuid}", response_model=schemas.Item)
def update_item(
item_uuid: str, data: schemas.ItemUpdate, db: Session = Depends(get_db)
):
item = utils.get_item_by_uuid(db, UUID(item_uuid))
if not item:
raise HTTPException(status_code=404, detail="Item not found")
return utils.update_item(db, item, data)
@app.get("/item/{item_uuid}", response_model=schemas.Item)
def get_item(item_uuid: str, db: Session = Depends(get_db)):
item = utils.get_item_by_uuid(db, UUID(item_uuid))
if not item:
raise HTTPException(status_code=404, detail="Item not found")
return item
@app.get("/items", response_model=list[schemas.Item])
def get_items(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
check_token(token)
return utils.get_stored_items(db)
@app.get("/tag/{tag}", response_model=schemas.Item)
def get_item_by_tag(tag: str, token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
check_token(token)
item = utils.get_item_by_tag(db, tag)
if not item:
raise HTTPException(status_code=404, detail="Item not found")
return item
@app.get("/storages", response_model=list[schemas.Storage])
def list_storages(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
check_token(token)
return utils.get_storages(db)
@app.post("/checkin", response_model=schemas.Item)
def checkin_item_by_uuid(
checkin: schemas.ItemCheckin,
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db),
):
check_token(token)
item = utils.get_item_by_uuid(db, UUID(checkin.item_uuid))
if item is None:
raise HTTPException(status_code=404, detail="Item not found")
storage = utils.get_storage(db, checkin.storage_name)
if storage is None:
raise HTTPException(status_code=404, detail="Storage not found")
return utils.receive_item(db, item, storage)
@app.post("/token")
def verify_supporter(form_data: OAuth2PasswordRequestForm = Depends()):
if form_data.password != settings.shared_secret:
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {
"access_token": oauth2_tokener.dumps(
(datetime.now() + timedelta(minutes=settings.token_lifetime)).timestamp()
),
"token_type": "bearer",
}
@app.get("/token/check")
def check_token_validity(token: str = Depends(oauth2_scheme)):
check_token(token)