cardserver / app /core /auth.py
GitHub Actions
πŸš€ Auto-deploy from GitHub
e673ce2
"""
Authentication middleware for JWT token validation with Supabase database integration.
Supports dual authentication: JWT tokens for users and HuggingFace API key for admin access.
"""
from fastapi import HTTPException, status, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import Optional, Dict, Any
import os
import jwt
import logging
from datetime import datetime
from dotenv import load_dotenv
from ..services.database import get_user_session, get_user_by_username
load_dotenv()
security = HTTPBearer(auto_error=False)
logger = logging.getLogger(__name__)
def get_secret_key() -> str:
"""Get JWT secret key from environment"""
secret_key = os.getenv("SECRET_KEY")
if not secret_key:
raise ValueError("SECRET_KEY environment variable not set. Cannot issue or verify JWTs.")
return secret_key
def get_jwt_issuer() -> Optional[str]:
"""Get JWT issuer from environment"""
return os.getenv("JWT_ISSUER")
def get_jwt_audience() -> Optional[str]:
"""Get JWT audience from environment"""
return os.getenv("JWT_AUDIENCE")
def get_hf_api_key() -> Optional[str]:
"""Get HuggingFace API key from environment"""
return os.getenv("HF_API_KEY")
async def authenticate_request(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)) -> bool:
"""
Primary authentication dependency for protected endpoints.
Implements dual authentication strategy:
1. HuggingFace API key (admin bypass) - simple string comparison
2. JWT token (user authentication) - cryptographic validation + session verification
For JWT tokens:
- Validates signature, expiration, audience, and issuer
- Checks session validity in Supabase database via 'jti' claim
- Rejects revoked sessions
Returns True if authentication succeeds, otherwise raises HTTPException.
"""
expected_token = get_hf_api_key()
if not credentials:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required. Please provide a valid JWT token or HuggingFace API key.",
headers={"WWW-Authenticate": "Bearer"},
)
token = credentials.credentials
# Check HuggingFace API key first (admin bypass - performance optimization)
if expected_token and token == expected_token:
return True
# Validate JWT token with full session verification
try:
secret_key = get_secret_key()
issuer = get_jwt_issuer()
audience = get_jwt_audience()
payload = jwt.decode(
token,
secret_key,
algorithms=["HS256"],
audience=audience,
issuer=issuer
)
# Check if session is still valid (not revoked)
jti = payload.get("jti")
if jti:
session = await get_user_session(jti)
if not session:
logger.warning(f"JWT verification failed: Session has been revoked for jti: {jti}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Session has been revoked",
headers={"WWW-Authenticate": "Bearer"},
)
return True
except jwt.ExpiredSignatureError:
logger.warning("JWT verification failed: Token has expired")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired",
headers={"WWW-Authenticate": "Bearer"},
)
except jwt.InvalidTokenError as e:
logger.warning(f"JWT verification failed: Invalid token - {e}")
# Potential Issue: Broad exception handling. Catching InvalidTokenError is a safe default
# to avoid leaking error details, but it can make debugging harder.
# Consider logging the specific error here for internal monitoring.
pass
# If neither verification method worked, deny access
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication token",
headers={"WWW-Authenticate": "Bearer"},
)
async def optional_auth(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)) -> bool:
"""
Optional authentication - doesn't raise errors if no token provided.
Returns True if authentication is successful, False otherwise.
Used for endpoints that can work with or without authentication.
"""
if not credentials:
return False
token = credentials.credentials
expected_token = get_hf_api_key()
# Check HF API key
if expected_token and token == expected_token:
return True
# Check JWT token
try:
secret_key = get_secret_key()
issuer = get_jwt_issuer()
audience = get_jwt_audience()
payload = jwt.decode(
token,
secret_key,
algorithms=["HS256"],
audience=audience,
issuer=issuer
)
# Check session validity
jti = payload.get("jti")
if jti:
session = await get_user_session(jti)
return session is not None
return True
except jwt.InvalidTokenError:
return False
async def get_current_user(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)) -> Optional[Dict[str, Any]]:
"""
Extract authenticated user data from JWT token.
Returns:
- User data dict if authenticated with valid JWT token
- None if using HuggingFace API key (no user context)
- None if not authenticated or invalid token
For JWT tokens:
- Validates token signature and session in Supabase
- Retrieves full user data from database using 'sub' claim (username)
"""
if not credentials:
return None
token = credentials.credentials
# Check if it's an HF API key (these don't have user context)
expected_hf_token = get_hf_api_key()
if expected_hf_token and token == expected_hf_token:
return None # HF API key users don't have user context
# Try to decode JWT token
try:
secret_key = get_secret_key()
issuer = get_jwt_issuer()
audience = get_jwt_audience()
payload = jwt.decode(
token,
secret_key,
algorithms=["HS256"],
audience=audience,
issuer=issuer
)
# Check if session is still valid
jti = payload.get("jti")
if jti:
session = await get_user_session(jti)
if not session:
return None
# Get user data from database using username from token
username = payload.get("sub")
if username:
user = await get_user_by_username(username)
return user
return None
except jwt.InvalidTokenError:
return None
async def require_current_user(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)) -> Dict[str, Any]:
"""
Extract authenticated user data from JWT token - mandatory authentication.
Use this dependency for endpoints that require user authentication.
Raises HTTPException if:
- No credentials provided
- Using HuggingFace API key (no user context)
- Invalid or expired JWT token
- Revoked session
Returns: User data dict from Supabase database
"""
user = await get_current_user(credentials)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required. Please provide a valid JWT token.",
headers={"WWW-Authenticate": "Bearer"},
)
return user