Skip to content
Snippets Groups Projects
Commit 63f02beb authored by hanfi's avatar hanfi
Browse files

Merge branch 'fixAuth' into 'main'

fix(auth): Protect /storages and /tag endpoints

Closes #1

See merge request hanfi/bgp_backend!3
parents 5033e83a 35a34ac8
Branches
No related tags found
1 merge request!3fix(auth): Protect /storages and /tag endpoints
......@@ -5,6 +5,7 @@ from fastapi import Depends, FastAPI, HTTPException, Request, UploadFile, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from itsdangerous.serializer import Serializer
from itsdangerous import BadSignature
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
from slowapi.util import get_remote_address
......@@ -48,7 +49,13 @@ def get_db():
def check_token(token: str):
if datetime.fromtimestamp(oauth2_tokener.loads(token)) < datetime.now():
try:
timestamp = oauth2_tokener.loads(token)
if datetime.fromtimestamp(timestamp) > datetime.now():
return # success
except BadSignature:
pass
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
......@@ -56,6 +63,7 @@ def check_token(token: str):
)
# Routes
@app.post("/item/prepare", response_model=schemas.Item)
@limiter.limit("2/minute")
......@@ -103,7 +111,8 @@ def get_items(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)
@app.get("/tag/{tag}", response_model=schemas.Item)
def get_item_by_tag(tag: str, db: Session = Depends(get_db)):
def get_item_by_tag(tag: str, token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
check_token(token)
item = utils.get_item_by_tag(db, tag)
if not item:
raise HTTPException(status_code=404, detail="Item not found")
......@@ -111,7 +120,8 @@ def get_item_by_tag(tag: str, db: Session = Depends(get_db)):
@app.get("/storages", response_model=list[schemas.Storage])
def list_storages(db: Session = Depends(get_db)):
def list_storages(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
check_token(token)
return utils.get_storages(db)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment