Skip to content
Snippets Groups Projects

fix(auth): Protect /storages and /tag endpoints

Merged c-tim requested to merge c-tim/bgp_backend:fixAuth into staging
1 file
+ 18
8
Compare changes
  • Side-by-side
  • Inline
+ 18
8
@@ -5,6 +5,7 @@ from fastapi import Depends, FastAPI, HTTPException, Request, UploadFile, status
@@ -5,6 +5,7 @@ from fastapi import Depends, FastAPI, HTTPException, Request, UploadFile, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from itsdangerous.serializer import Serializer
from itsdangerous.serializer import Serializer
 
from itsdangerous import BadSignature
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
from slowapi.errors import RateLimitExceeded
from slowapi.util import get_remote_address
from slowapi.util import get_remote_address
@@ -48,12 +49,19 @@ def get_db():
@@ -48,12 +49,19 @@ def get_db():
def check_token(token: str):
def check_token(token: str):
if datetime.fromtimestamp(oauth2_tokener.loads(token)) < datetime.now():
try:
raise HTTPException(
timestamp = oauth2_tokener.loads(token)
status_code=status.HTTP_401_UNAUTHORIZED,
if datetime.fromtimestamp(timestamp) > datetime.now():
detail="Invalid authentication credentials",
return # success
headers={"WWW-Authenticate": "Bearer"},
except BadSignature:
)
pass
 
 
raise HTTPException(
 
status_code=status.HTTP_401_UNAUTHORIZED,
 
detail="Invalid authentication credentials",
 
headers={"WWW-Authenticate": "Bearer"},
 
)
 
# Routes
# Routes
@@ -103,7 +111,8 @@ def get_items(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)
@@ -103,7 +111,8 @@ def get_items(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)
@app.get("/tag/{tag}", response_model=schemas.Item)
@app.get("/tag/{tag}", response_model=schemas.Item)
def get_item_by_tag(tag: str, db: Session = Depends(get_db)):
def get_item_by_tag(tag: str, token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
 
check_token(token)
item = utils.get_item_by_tag(db, tag)
item = utils.get_item_by_tag(db, tag)
if not item:
if not item:
raise HTTPException(status_code=404, detail="Item not found")
raise HTTPException(status_code=404, detail="Item not found")
@@ -111,7 +120,8 @@ def get_item_by_tag(tag: str, db: Session = Depends(get_db)):
@@ -111,7 +120,8 @@ def get_item_by_tag(tag: str, db: Session = Depends(get_db)):
@app.get("/storages", response_model=list[schemas.Storage])
@app.get("/storages", response_model=list[schemas.Storage])
def list_storages(db: Session = Depends(get_db)):
def list_storages(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
 
check_token(token)
return utils.get_storages(db)
return utils.get_storages(db)
Loading