Spaces:
Sleeping
Sleeping
| """ | |
| Authentication middleware for JWT token validation with Supabase database integration. | |
| Supports dual authentication: JWT tokens for users and HuggingFace API key for admin access. | |
| """ | |
| from fastapi import HTTPException, status, Depends | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from typing import Optional, Dict, Any | |
| import os | |
| import jwt | |
| import logging | |
| from datetime import datetime | |
| from dotenv import load_dotenv | |
| from ..services.database import get_user_session, get_user_by_username | |
| load_dotenv() | |
| security = HTTPBearer(auto_error=False) | |
| logger = logging.getLogger(__name__) | |
| def get_secret_key() -> str: | |
| """Get JWT secret key from environment""" | |
| secret_key = os.getenv("SECRET_KEY") | |
| if not secret_key: | |
| raise ValueError("SECRET_KEY environment variable not set. Cannot issue or verify JWTs.") | |
| return secret_key | |
| def get_jwt_issuer() -> Optional[str]: | |
| """Get JWT issuer from environment""" | |
| return os.getenv("JWT_ISSUER") | |
| def get_jwt_audience() -> Optional[str]: | |
| """Get JWT audience from environment""" | |
| return os.getenv("JWT_AUDIENCE") | |
| def get_hf_api_key() -> Optional[str]: | |
| """Get HuggingFace API key from environment""" | |
| return os.getenv("HF_API_KEY") | |
| async def authenticate_request(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)) -> bool: | |
| """ | |
| Primary authentication dependency for protected endpoints. | |
| Implements dual authentication strategy: | |
| 1. HuggingFace API key (admin bypass) - simple string comparison | |
| 2. JWT token (user authentication) - cryptographic validation + session verification | |
| For JWT tokens: | |
| - Validates signature, expiration, audience, and issuer | |
| - Checks session validity in Supabase database via 'jti' claim | |
| - Rejects revoked sessions | |
| Returns True if authentication succeeds, otherwise raises HTTPException. | |
| """ | |
| expected_token = get_hf_api_key() | |
| if not credentials: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Authentication required. Please provide a valid JWT token or HuggingFace API key.", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| token = credentials.credentials | |
| # Check HuggingFace API key first (admin bypass - performance optimization) | |
| if expected_token and token == expected_token: | |
| return True | |
| # Validate JWT token with full session verification | |
| try: | |
| secret_key = get_secret_key() | |
| issuer = get_jwt_issuer() | |
| audience = get_jwt_audience() | |
| payload = jwt.decode( | |
| token, | |
| secret_key, | |
| algorithms=["HS256"], | |
| audience=audience, | |
| issuer=issuer | |
| ) | |
| # Check if session is still valid (not revoked) | |
| jti = payload.get("jti") | |
| if jti: | |
| session = await get_user_session(jti) | |
| if not session: | |
| logger.warning(f"JWT verification failed: Session has been revoked for jti: {jti}") | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Session has been revoked", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| return True | |
| except jwt.ExpiredSignatureError: | |
| logger.warning("JWT verification failed: Token has expired") | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Token has expired", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| except jwt.InvalidTokenError as e: | |
| logger.warning(f"JWT verification failed: Invalid token - {e}") | |
| # Potential Issue: Broad exception handling. Catching InvalidTokenError is a safe default | |
| # to avoid leaking error details, but it can make debugging harder. | |
| # Consider logging the specific error here for internal monitoring. | |
| pass | |
| # If neither verification method worked, deny access | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid authentication token", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| async def optional_auth(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)) -> bool: | |
| """ | |
| Optional authentication - doesn't raise errors if no token provided. | |
| Returns True if authentication is successful, False otherwise. | |
| Used for endpoints that can work with or without authentication. | |
| """ | |
| if not credentials: | |
| return False | |
| token = credentials.credentials | |
| expected_token = get_hf_api_key() | |
| # Check HF API key | |
| if expected_token and token == expected_token: | |
| return True | |
| # Check JWT token | |
| try: | |
| secret_key = get_secret_key() | |
| issuer = get_jwt_issuer() | |
| audience = get_jwt_audience() | |
| payload = jwt.decode( | |
| token, | |
| secret_key, | |
| algorithms=["HS256"], | |
| audience=audience, | |
| issuer=issuer | |
| ) | |
| # Check session validity | |
| jti = payload.get("jti") | |
| if jti: | |
| session = await get_user_session(jti) | |
| return session is not None | |
| return True | |
| except jwt.InvalidTokenError: | |
| return False | |
| async def get_current_user(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)) -> Optional[Dict[str, Any]]: | |
| """ | |
| Extract authenticated user data from JWT token. | |
| Returns: | |
| - User data dict if authenticated with valid JWT token | |
| - None if using HuggingFace API key (no user context) | |
| - None if not authenticated or invalid token | |
| For JWT tokens: | |
| - Validates token signature and session in Supabase | |
| - Retrieves full user data from database using 'sub' claim (username) | |
| """ | |
| if not credentials: | |
| return None | |
| token = credentials.credentials | |
| # Check if it's an HF API key (these don't have user context) | |
| expected_hf_token = get_hf_api_key() | |
| if expected_hf_token and token == expected_hf_token: | |
| return None # HF API key users don't have user context | |
| # Try to decode JWT token | |
| try: | |
| secret_key = get_secret_key() | |
| issuer = get_jwt_issuer() | |
| audience = get_jwt_audience() | |
| payload = jwt.decode( | |
| token, | |
| secret_key, | |
| algorithms=["HS256"], | |
| audience=audience, | |
| issuer=issuer | |
| ) | |
| # Check if session is still valid | |
| jti = payload.get("jti") | |
| if jti: | |
| session = await get_user_session(jti) | |
| if not session: | |
| return None | |
| # Get user data from database using username from token | |
| username = payload.get("sub") | |
| if username: | |
| user = await get_user_by_username(username) | |
| return user | |
| return None | |
| except jwt.InvalidTokenError: | |
| return None | |
| async def require_current_user(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)) -> Dict[str, Any]: | |
| """ | |
| Extract authenticated user data from JWT token - mandatory authentication. | |
| Use this dependency for endpoints that require user authentication. | |
| Raises HTTPException if: | |
| - No credentials provided | |
| - Using HuggingFace API key (no user context) | |
| - Invalid or expired JWT token | |
| - Revoked session | |
| Returns: User data dict from Supabase database | |
| """ | |
| user = await get_current_user(credentials) | |
| if not user: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Authentication required. Please provide a valid JWT token.", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| return user | |