File size: 3,139 Bytes
6b408d7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
"""

API Key authentication middleware.



Provides middleware for API key validation.

"""

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import JSONResponse

from app.config import get_settings
from app.utils.logger import get_logger

logger = get_logger(__name__)


class APIKeyMiddleware(BaseHTTPMiddleware):
    """

    Middleware for API key authentication.



    This middleware checks for valid API keys on protected endpoints.

    Public endpoints (health, docs) are excluded from authentication.

    """

    # Endpoints that don't require authentication
    PUBLIC_PATHS: set[str] = {
        "/api/health",
        "/api/ready",
        "/api/languages",
        "/api/",
        "/docs",
        "/redoc",
        "/openapi.json",
        "/",
    }

    async def dispatch(self, request: Request, call_next):
        """

        Process request and validate API key for protected endpoints.



        Args:

            request: Incoming request

            call_next: Next middleware/handler



        Returns:

            Response from next handler or 401 error

        """
        # Skip authentication for public paths
        path = request.url.path.rstrip("/") or "/"

        # Check if path is public
        is_public = path in self.PUBLIC_PATHS or any(
            path.startswith(public.rstrip("/")) for public in self.PUBLIC_PATHS if public != "/"
        )

        if is_public:
            return await call_next(request)

        # Get API key from header
        api_key = request.headers.get("x-api-key")

        if not api_key:
            logger.warning(
                "Request without API key",
                path=path,
                method=request.method,
            )
            return JSONResponse(
                status_code=401,
                content={
                    "status": "error",
                    "message": "API key is required",
                },
                headers={"WWW-Authenticate": "ApiKey"},
            )

        # Validate API key
        settings = get_settings()
        valid_keys = settings.api_keys_list

        if not valid_keys:
            logger.error("No API keys configured")
            return JSONResponse(
                status_code=500,
                content={
                    "status": "error",
                    "message": "Server configuration error",
                },
            )

        if api_key not in valid_keys:
            logger.warning(
                "Invalid API key",
                path=path,
                key_prefix=api_key[:8] + "..." if len(api_key) > 8 else "***",
            )
            return JSONResponse(
                status_code=401,
                content={
                    "status": "error",
                    "message": "Invalid API key",
                },
                headers={"WWW-Authenticate": "ApiKey"},
            )

        # Continue to next handler
        return await call_next(request)