Skip to content
Snippets Groups Projects
Commit 28821e0a authored by c-tim's avatar c-tim
Browse files

Merge branch 'fixAuth' into 'staging'

fix(auth): Protect /storages and /tag endpoints

See merge request hanfi/bgp_backend!4
parents 95607b79 35a34ac8
No related branches found
No related tags found
2 merge requests!5merge from staging,!4fix(auth): Protect /storages and /tag endpoints
......@@ -5,6 +5,7 @@ from fastapi import Depends, FastAPI, HTTPException, Request, UploadFile, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from itsdangerous.serializer import Serializer
from itsdangerous import BadSignature
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
from slowapi.util import get_remote_address
......@@ -48,12 +49,19 @@ def get_db():
def check_token(token: str):
if datetime.fromtimestamp(oauth2_tokener.loads(token)) < datetime.now():
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
timestamp = oauth2_tokener.loads(token)
if datetime.fromtimestamp(timestamp) > datetime.now():
return # success
except BadSignature:
pass
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Routes
......@@ -103,7 +111,8 @@ def get_items(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)
@app.get("/tag/{tag}", response_model=schemas.Item)
def get_item_by_tag(tag: str, db: Session = Depends(get_db)):
def get_item_by_tag(tag: str, token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
check_token(token)
item = utils.get_item_by_tag(db, tag)
if not item:
raise HTTPException(status_code=404, detail="Item not found")
......@@ -111,7 +120,8 @@ def get_item_by_tag(tag: str, db: Session = Depends(get_db)):
@app.get("/storages", response_model=list[schemas.Storage])
def list_storages(db: Session = Depends(get_db)):
def list_storages(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
check_token(token)
return utils.get_storages(db)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment