Spaces:
Sleeping
Sleeping
File size: 3,042 Bytes
e650b33 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 | from fastapi import Request, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import Optional
from src.utils.security import verify_token
from src.models.user import User
from sqlalchemy.orm import Session
from src.database import get_db_session
class JWTBearer(HTTPBearer):
"""
JWT Bearer token authentication middleware.
"""
def __init__(self, auto_error: bool = True):
super(JWTBearer, self).__init__(auto_error=auto_error)
async def __call__(self, request: Request) -> Optional[str]:
"""
Validate JWT token from request.
Args:
request: FastAPI request object
Returns:
str: The validated token if valid, None if invalid and auto_error is False
Raises:
HTTPException: If token is invalid and auto_error is True
"""
credentials: HTTPAuthorizationCredentials = await super(JWTBearer, self).__call__(request)
if credentials:
if not credentials.scheme == "Bearer":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid authentication scheme."
)
token = credentials.credentials
else:
# Try to get token from cookie as fallback
token = request.cookies.get("access_token")
if not token:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="No token provided."
)
# Verify the token
payload = verify_token(token)
if payload is None:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid or expired token."
)
# Add user ID to request state for later use
user_id = payload.get("sub")
if not user_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid token payload."
)
request.state.user_id = user_id
return token
def verify_user_access(user_id: str, authenticated_user_id: str) -> bool:
"""
Verify that the requested user ID matches the authenticated user ID.
Args:
user_id: The user ID from the request path/params
authenticated_user_id: The user ID from the JWT token
Returns:
bool: True if the IDs match, False otherwise
"""
return user_id == authenticated_user_id
def get_current_user_from_request(request: Request) -> str:
"""
Get the authenticated user ID from the request state.
Args:
request: FastAPI request object
Returns:
str: The authenticated user ID
"""
if hasattr(request.state, 'user_id'):
return request.state.user_id
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not authenticated"
) |