""" FastAPI dependency injection. Provides dependencies for route handlers. """ from typing import Annotated from fastapi import Depends from fastapi import Header from fastapi import HTTPException from fastapi import Request from fastapi import status from app.config import Settings from app.config import get_settings from app.ml.model_loader import ModelLoader from app.services.voice_detector import VoiceDetector from app.utils.logger import get_logger logger = get_logger(__name__) def get_model_loader(request: Request) -> ModelLoader: """ Get ModelLoader from app state. Args: request: FastAPI request object Returns: ModelLoader instance from app state """ if hasattr(request.app.state, "model_loader"): return request.app.state.model_loader return ModelLoader() def get_voice_detector( model_loader: Annotated[ModelLoader, Depends(get_model_loader)], ) -> VoiceDetector: """ Get VoiceDetector instance. Args: model_loader: ModelLoader from dependency Returns: Configured VoiceDetector instance """ return VoiceDetector(model_loader=model_loader) async def validate_api_key( x_api_key: Annotated[ str | None, Header( alias="x-api-key", description="API key for authentication", ), ] = None, settings: Annotated[Settings, Depends(get_settings)] = None, # type: ignore ) -> str: """ Validate API key from request header. Args: x_api_key: API key from x-api-key header settings: Application settings Returns: Validated API key Raises: HTTPException: 401 if API key is missing or invalid """ if settings is None: settings = get_settings() if not x_api_key: logger.warning("Request without API key") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="API key is required. Provide it in the x-api-key header.", headers={"WWW-Authenticate": "ApiKey"}, ) # Get valid API keys valid_keys = settings.api_keys_list if not valid_keys: logger.error("No API keys configured on server") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Server configuration error", ) # Constant-time comparison to prevent timing attacks key_valid = False for valid_key in valid_keys: if _constant_time_compare(x_api_key, valid_key): key_valid = True break if not key_valid: logger.warning( "Invalid API key attempt", key_prefix=x_api_key[:8] + "..." if len(x_api_key) > 8 else "***", ) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key", headers={"WWW-Authenticate": "ApiKey"}, ) return x_api_key def _constant_time_compare(val1: str, val2: str) -> bool: """ Constant-time string comparison to prevent timing attacks. Args: val1: First string val2: Second string Returns: True if strings are equal """ if len(val1) != len(val2): return False result = 0 for x, y in zip(val1, val2): result |= ord(x) ^ ord(y) return result == 0 # Type aliases for cleaner route signatures ValidatedApiKey = Annotated[str, Depends(validate_api_key)] VoiceDetectorDep = Annotated[VoiceDetector, Depends(get_voice_detector)] SettingsDep = Annotated[Settings, Depends(get_settings)]