File size: 3,139 Bytes
6b408d7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 | """
API Key authentication middleware.
Provides middleware for API key validation.
"""
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import JSONResponse
from app.config import get_settings
from app.utils.logger import get_logger
logger = get_logger(__name__)
class APIKeyMiddleware(BaseHTTPMiddleware):
"""
Middleware for API key authentication.
This middleware checks for valid API keys on protected endpoints.
Public endpoints (health, docs) are excluded from authentication.
"""
# Endpoints that don't require authentication
PUBLIC_PATHS: set[str] = {
"/api/health",
"/api/ready",
"/api/languages",
"/api/",
"/docs",
"/redoc",
"/openapi.json",
"/",
}
async def dispatch(self, request: Request, call_next):
"""
Process request and validate API key for protected endpoints.
Args:
request: Incoming request
call_next: Next middleware/handler
Returns:
Response from next handler or 401 error
"""
# Skip authentication for public paths
path = request.url.path.rstrip("/") or "/"
# Check if path is public
is_public = path in self.PUBLIC_PATHS or any(
path.startswith(public.rstrip("/")) for public in self.PUBLIC_PATHS if public != "/"
)
if is_public:
return await call_next(request)
# Get API key from header
api_key = request.headers.get("x-api-key")
if not api_key:
logger.warning(
"Request without API key",
path=path,
method=request.method,
)
return JSONResponse(
status_code=401,
content={
"status": "error",
"message": "API key is required",
},
headers={"WWW-Authenticate": "ApiKey"},
)
# Validate API key
settings = get_settings()
valid_keys = settings.api_keys_list
if not valid_keys:
logger.error("No API keys configured")
return JSONResponse(
status_code=500,
content={
"status": "error",
"message": "Server configuration error",
},
)
if api_key not in valid_keys:
logger.warning(
"Invalid API key",
path=path,
key_prefix=api_key[:8] + "..." if len(api_key) > 8 else "***",
)
return JSONResponse(
status_code=401,
content={
"status": "error",
"message": "Invalid API key",
},
headers={"WWW-Authenticate": "ApiKey"},
)
# Continue to next handler
return await call_next(request)
|