Spaces:
Runtime error
Runtime error
File size: 2,531 Bytes
dd1b74d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 | """JWT authentication middleware and dependencies."""
from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from models.user import User
from services.auth_service import decode_access_token
from database import get_db
# HTTP Bearer token security scheme
security = HTTPBearer()
class CredentialsError(Exception):
"""Custom exception for authentication errors."""
def __init__(self, detail: str):
self.detail = detail
def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
) -> User:
"""
Dependency to get the current authenticated user from JWT token.
Args:
credentials: HTTP Bearer token credentials
db: Database session
Returns:
The authenticated User object
Raises:
HTTPException: 401 if token is invalid or expired
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
token = credentials.credentials
payload = decode_access_token(token)
if payload is None:
raise credentials_exception
user_id_str: Optional[str] = payload.get("sub")
if user_id_str is None:
raise credentials_exception
# Convert string sub to int
try:
user_id = int(user_id_str)
except (ValueError, TypeError):
raise credentials_exception
# Fetch user from database
user = db.get(User, user_id)
if user is None:
raise credentials_exception
return user
def get_optional_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(
HTTPBearer(auto_error=False)
),
db: Session = Depends(get_db)
) -> Optional[User]:
"""
Optional authentication dependency.
Returns None if no valid token is provided, rather than raising an exception.
"""
if credentials is None:
return None
token = credentials.credentials
payload = decode_access_token(token)
if payload is None:
return None
user_id_str: Optional[str] = payload.get("sub")
if user_id_str is None:
return None
# Convert string sub to int
try:
user_id = int(user_id_str)
except (ValueError, TypeError):
return None
user = db.get(User, user_id)
return user
|