""" Authentication middleware for JWT token validation with Supabase database integration. Supports dual authentication: JWT tokens for users and HuggingFace API key for admin access. """ from fastapi import HTTPException, status, Depends from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from typing import Optional, Dict, Any import os import jwt import logging from datetime import datetime from dotenv import load_dotenv from ..services.database import get_user_session, get_user_by_username load_dotenv() security = HTTPBearer(auto_error=False) logger = logging.getLogger(__name__) def get_secret_key() -> str: """Get JWT secret key from environment""" secret_key = os.getenv("SECRET_KEY") if not secret_key: raise ValueError("SECRET_KEY environment variable not set. Cannot issue or verify JWTs.") return secret_key def get_jwt_issuer() -> Optional[str]: """Get JWT issuer from environment""" return os.getenv("JWT_ISSUER") def get_jwt_audience() -> Optional[str]: """Get JWT audience from environment""" return os.getenv("JWT_AUDIENCE") def get_hf_api_key() -> Optional[str]: """Get HuggingFace API key from environment""" return os.getenv("HF_API_KEY") async def authenticate_request(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)) -> bool: """ Primary authentication dependency for protected endpoints. Implements dual authentication strategy: 1. HuggingFace API key (admin bypass) - simple string comparison 2. JWT token (user authentication) - cryptographic validation + session verification For JWT tokens: - Validates signature, expiration, audience, and issuer - Checks session validity in Supabase database via 'jti' claim - Rejects revoked sessions Returns True if authentication succeeds, otherwise raises HTTPException. """ expected_token = get_hf_api_key() if not credentials: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required. Please provide a valid JWT token or HuggingFace API key.", headers={"WWW-Authenticate": "Bearer"}, ) token = credentials.credentials # Check HuggingFace API key first (admin bypass - performance optimization) if expected_token and token == expected_token: return True # Validate JWT token with full session verification try: secret_key = get_secret_key() issuer = get_jwt_issuer() audience = get_jwt_audience() payload = jwt.decode( token, secret_key, algorithms=["HS256"], audience=audience, issuer=issuer ) # Check if session is still valid (not revoked) jti = payload.get("jti") if jti: session = await get_user_session(jti) if not session: logger.warning(f"JWT verification failed: Session has been revoked for jti: {jti}") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Session has been revoked", headers={"WWW-Authenticate": "Bearer"}, ) return True except jwt.ExpiredSignatureError: logger.warning("JWT verification failed: Token has expired") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token has expired", headers={"WWW-Authenticate": "Bearer"}, ) except jwt.InvalidTokenError as e: logger.warning(f"JWT verification failed: Invalid token - {e}") # Potential Issue: Broad exception handling. Catching InvalidTokenError is a safe default # to avoid leaking error details, but it can make debugging harder. # Consider logging the specific error here for internal monitoring. pass # If neither verification method worked, deny access raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication token", headers={"WWW-Authenticate": "Bearer"}, ) async def optional_auth(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)) -> bool: """ Optional authentication - doesn't raise errors if no token provided. Returns True if authentication is successful, False otherwise. Used for endpoints that can work with or without authentication. """ if not credentials: return False token = credentials.credentials expected_token = get_hf_api_key() # Check HF API key if expected_token and token == expected_token: return True # Check JWT token try: secret_key = get_secret_key() issuer = get_jwt_issuer() audience = get_jwt_audience() payload = jwt.decode( token, secret_key, algorithms=["HS256"], audience=audience, issuer=issuer ) # Check session validity jti = payload.get("jti") if jti: session = await get_user_session(jti) return session is not None return True except jwt.InvalidTokenError: return False async def get_current_user(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)) -> Optional[Dict[str, Any]]: """ Extract authenticated user data from JWT token. Returns: - User data dict if authenticated with valid JWT token - None if using HuggingFace API key (no user context) - None if not authenticated or invalid token For JWT tokens: - Validates token signature and session in Supabase - Retrieves full user data from database using 'sub' claim (username) """ if not credentials: return None token = credentials.credentials # Check if it's an HF API key (these don't have user context) expected_hf_token = get_hf_api_key() if expected_hf_token and token == expected_hf_token: return None # HF API key users don't have user context # Try to decode JWT token try: secret_key = get_secret_key() issuer = get_jwt_issuer() audience = get_jwt_audience() payload = jwt.decode( token, secret_key, algorithms=["HS256"], audience=audience, issuer=issuer ) # Check if session is still valid jti = payload.get("jti") if jti: session = await get_user_session(jti) if not session: return None # Get user data from database using username from token username = payload.get("sub") if username: user = await get_user_by_username(username) return user return None except jwt.InvalidTokenError: return None async def require_current_user(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)) -> Dict[str, Any]: """ Extract authenticated user data from JWT token - mandatory authentication. Use this dependency for endpoints that require user authentication. Raises HTTPException if: - No credentials provided - Using HuggingFace API key (no user context) - Invalid or expired JWT token - Revoked session Returns: User data dict from Supabase database """ user = await get_current_user(credentials) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required. Please provide a valid JWT token.", headers={"WWW-Authenticate": "Bearer"}, ) return user