Spaces:
Sleeping
Sleeping
File size: 7,873 Bytes
da64db2 e673ce2 da64db2 e673ce2 da64db2 76ca861 e673ce2 76ca861 da64db2 e673ce2 da64db2 e673ce2 da64db2 e673ce2 da64db2 e673ce2 da64db2 e673ce2 da64db2 e673ce2 da64db2 e673ce2 da64db2 76ca861 e673ce2 76ca861 e673ce2 76ca861 e673ce2 86d79a3 e673ce2 86d79a3 76ca861 e673ce2 76ca861 da64db2 76ca861 da64db2 86d79a3 da64db2 e673ce2 da64db2 86d79a3 e673ce2 86d79a3 e673ce2 86d79a3 e673ce2 86d79a3 e673ce2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 | """
Authentication middleware for JWT token validation with Supabase database integration.
Supports dual authentication: JWT tokens for users and HuggingFace API key for admin access.
"""
from fastapi import HTTPException, status, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import Optional, Dict, Any
import os
import jwt
import logging
from datetime import datetime
from dotenv import load_dotenv
from ..services.database import get_user_session, get_user_by_username
load_dotenv()
security = HTTPBearer(auto_error=False)
logger = logging.getLogger(__name__)
def get_secret_key() -> str:
"""Get JWT secret key from environment"""
secret_key = os.getenv("SECRET_KEY")
if not secret_key:
raise ValueError("SECRET_KEY environment variable not set. Cannot issue or verify JWTs.")
return secret_key
def get_jwt_issuer() -> Optional[str]:
"""Get JWT issuer from environment"""
return os.getenv("JWT_ISSUER")
def get_jwt_audience() -> Optional[str]:
"""Get JWT audience from environment"""
return os.getenv("JWT_AUDIENCE")
def get_hf_api_key() -> Optional[str]:
"""Get HuggingFace API key from environment"""
return os.getenv("HF_API_KEY")
async def authenticate_request(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)) -> bool:
"""
Primary authentication dependency for protected endpoints.
Implements dual authentication strategy:
1. HuggingFace API key (admin bypass) - simple string comparison
2. JWT token (user authentication) - cryptographic validation + session verification
For JWT tokens:
- Validates signature, expiration, audience, and issuer
- Checks session validity in Supabase database via 'jti' claim
- Rejects revoked sessions
Returns True if authentication succeeds, otherwise raises HTTPException.
"""
expected_token = get_hf_api_key()
if not credentials:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required. Please provide a valid JWT token or HuggingFace API key.",
headers={"WWW-Authenticate": "Bearer"},
)
token = credentials.credentials
# Check HuggingFace API key first (admin bypass - performance optimization)
if expected_token and token == expected_token:
return True
# Validate JWT token with full session verification
try:
secret_key = get_secret_key()
issuer = get_jwt_issuer()
audience = get_jwt_audience()
payload = jwt.decode(
token,
secret_key,
algorithms=["HS256"],
audience=audience,
issuer=issuer
)
# Check if session is still valid (not revoked)
jti = payload.get("jti")
if jti:
session = await get_user_session(jti)
if not session:
logger.warning(f"JWT verification failed: Session has been revoked for jti: {jti}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Session has been revoked",
headers={"WWW-Authenticate": "Bearer"},
)
return True
except jwt.ExpiredSignatureError:
logger.warning("JWT verification failed: Token has expired")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired",
headers={"WWW-Authenticate": "Bearer"},
)
except jwt.InvalidTokenError as e:
logger.warning(f"JWT verification failed: Invalid token - {e}")
# Potential Issue: Broad exception handling. Catching InvalidTokenError is a safe default
# to avoid leaking error details, but it can make debugging harder.
# Consider logging the specific error here for internal monitoring.
pass
# If neither verification method worked, deny access
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication token",
headers={"WWW-Authenticate": "Bearer"},
)
async def optional_auth(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)) -> bool:
"""
Optional authentication - doesn't raise errors if no token provided.
Returns True if authentication is successful, False otherwise.
Used for endpoints that can work with or without authentication.
"""
if not credentials:
return False
token = credentials.credentials
expected_token = get_hf_api_key()
# Check HF API key
if expected_token and token == expected_token:
return True
# Check JWT token
try:
secret_key = get_secret_key()
issuer = get_jwt_issuer()
audience = get_jwt_audience()
payload = jwt.decode(
token,
secret_key,
algorithms=["HS256"],
audience=audience,
issuer=issuer
)
# Check session validity
jti = payload.get("jti")
if jti:
session = await get_user_session(jti)
return session is not None
return True
except jwt.InvalidTokenError:
return False
async def get_current_user(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)) -> Optional[Dict[str, Any]]:
"""
Extract authenticated user data from JWT token.
Returns:
- User data dict if authenticated with valid JWT token
- None if using HuggingFace API key (no user context)
- None if not authenticated or invalid token
For JWT tokens:
- Validates token signature and session in Supabase
- Retrieves full user data from database using 'sub' claim (username)
"""
if not credentials:
return None
token = credentials.credentials
# Check if it's an HF API key (these don't have user context)
expected_hf_token = get_hf_api_key()
if expected_hf_token and token == expected_hf_token:
return None # HF API key users don't have user context
# Try to decode JWT token
try:
secret_key = get_secret_key()
issuer = get_jwt_issuer()
audience = get_jwt_audience()
payload = jwt.decode(
token,
secret_key,
algorithms=["HS256"],
audience=audience,
issuer=issuer
)
# Check if session is still valid
jti = payload.get("jti")
if jti:
session = await get_user_session(jti)
if not session:
return None
# Get user data from database using username from token
username = payload.get("sub")
if username:
user = await get_user_by_username(username)
return user
return None
except jwt.InvalidTokenError:
return None
async def require_current_user(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)) -> Dict[str, Any]:
"""
Extract authenticated user data from JWT token - mandatory authentication.
Use this dependency for endpoints that require user authentication.
Raises HTTPException if:
- No credentials provided
- Using HuggingFace API key (no user context)
- Invalid or expired JWT token
- Revoked session
Returns: User data dict from Supabase database
"""
user = await get_current_user(credentials)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required. Please provide a valid JWT token.",
headers={"WWW-Authenticate": "Bearer"},
)
return user
|