File size: 2,203 Bytes
5c244a3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 | # Implements: T010
# Phase III - AI-Powered Todo Chatbot
# JWT Verification Middleware - Extracts user_id and rejects invalid tokens
from typing import Optional
from fastapi import HTTPException, Security, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
from src.core.config import settings
# JWT Configuration
JWT_SECRET = settings.jwt_secret
JWT_ALGORITHM = settings.jwt_algorithm
# Security scheme for FastAPI
security = HTTPBearer()
async def verify_jwt(credentials: HTTPAuthorizationCredentials = Security(security)) -> str:
"""
Verify JWT token and extract user_id.
Args:
credentials: HTTP Bearer token from Authorization header
Returns:
user_id: Extracted user UUID from JWT token
Raises:
HTTPException 401: If token is invalid, expired, or malformed
"""
if not credentials:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
token = credentials.credentials
try:
# Decode and verify JWT token
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
# Extract user_id from payload
user_id: str = payload.get("sub")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token: missing user_id",
)
return user_id
except JWTError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Invalid token: {str(e)}",
headers={"WWW-Authenticate": "Bearer"},
)
async def get_current_user_id(credentials: HTTPAuthorizationCredentials = Security(security)) -> str:
"""
Dependency injection helper to get current user_id from JWT.
Usage in FastAPI endpoints:
user_id = await get_current_user_id(credentials)
Args:
credentials: HTTP Bearer token from Authorization header
Returns:
user_id: Extracted user UUID
"""
return await verify_jwt(credentials)
|