""" API Authentication Provides API key authentication with environment-based configuration. """ import os import secrets from typing import Optional, Dict from pydantic import BaseModel from fastapi import HTTPException, status, Header, Query, Depends from fastapi.security import APIKeyHeader # API Key header scheme api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False) class APIKeyInfo(BaseModel): """API key information.""" key: str name: str permissions: list = [] # Cache for API keys (loaded from environment) _valid_api_keys: Dict[str, APIKeyInfo] = {} def load_api_keys() -> Dict[str, APIKeyInfo]: """ Load API keys from environment variables. Format: API_KEY=your-api-key API_KEY_NAME=Production API_KEY_PERMISSIONS=read,write Multiple keys can be set with suffixes: API_KEY_1=... API_KEY_2=... """ global _valid_api_keys if _valid_api_keys: return _valid_api_keys keys = {} # Load primary API key primary_key = os.getenv("API_KEY") if primary_key: keys[primary_key] = APIKeyInfo( key=primary_key, name=os.getenv("API_KEY_NAME", "Default"), permissions=os.getenv("API_KEY_PERMISSIONS", "read,write").split(",") ) # Load additional API keys (API_KEY_1, API_KEY_2, etc.) for i in range(1, 10): key = os.getenv(f"API_KEY_{i}") if key: keys[key] = APIKeyInfo( key=key, name=os.getenv(f"API_KEY_{i}_NAME", f"Key {i}"), permissions=os.getenv(f"API_KEY_{i}_PERMISSIONS", "read,write").split(",") ) # Fallback to demo key in development (not production) if not keys and os.getenv("UNESDOC_PIPELINE_ENV") != "production": demo_key = "demo-key-12345" keys[demo_key] = APIKeyInfo( key=demo_key, name="Demo User", permissions=["read", "write"] ) _valid_api_keys = keys return keys def setup_demo_key(): """Initialize API keys on startup.""" load_api_keys() class APIKeyMissingError(HTTPException): """Error when API key is missing.""" def __init__(self): super().__init__( status_code=status.HTTP_403_FORBIDDEN, detail="Missing API key. Include 'X-API-Key' header." ) class APIKeyInvalidError(HTTPException): """Error when API key is invalid.""" def __init__(self): super().__init__( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key." ) async def verify_api_key( header_key: Optional[str] = Header(None, alias="X-API-Key"), query_key: Optional[str] = Query(None, alias="api_key") ) -> APIKeyInfo: """ Verify API key from header or query parameter. Args: header_key: API key from X-API-Key header query_key: API key from api_key query parameter Returns: APIKeyInfo if valid Raises: APIKeyMissingError: If no key provided APIKeyInvalidError: If key is invalid """ provided_key = header_key or query_key if not provided_key: raise APIKeyMissingError() # Load keys from environment valid_keys = load_api_keys() if not valid_keys: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="No API keys configured. Set API_KEY environment variable." ) # Use timing-safe comparison for valid_key, info in valid_keys.items(): if secrets.compare_digest(provided_key, valid_key): return info raise APIKeyInvalidError() async def verify_api_key_optional( header_key: Optional[str] = Header(None, alias="X-API-Key"), query_key: Optional[str] = Query(None, alias="api_key") ) -> Optional[APIKeyInfo]: """ Optionally verify API key (for public endpoints with enhanced access). Returns None if no key provided, APIKeyInfo if valid key. """ provided_key = header_key or query_key if not provided_key: return None valid_keys = load_api_keys() for valid_key, info in valid_keys.items(): if secrets.compare_digest(provided_key, valid_key): return info return None