File size: 4,851 Bytes
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ae946d
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import os

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request


class SecurityHeadersMiddleware(BaseHTTPMiddleware):
    """
    Comprehensive security headers middleware implementing defense-in-depth.

    Protects against:
    - XSS (Content-Security-Policy)
    - Clickjacking (X-Frame-Options)
    - MIME-type attacks (X-Content-Type-Options)
    - Data leaks (Referrer-Policy)
    - Unwanted features (Permissions-Policy)
    """

    async def dispatch(self, request: Request, call_next):
        response = await call_next(request)

        # Content Security Policy - Strict policy to prevent XSS
        response.headers["Content-Security-Policy"] = (
            "default-src 'self'; "
            "script-src 'self' 'unsafe-inline' 'unsafe-eval'; "  # Adjust 'unsafe-*' as needed
            "style-src 'self' 'unsafe-inline'; "
            "img-src 'self' data: https: blob:; "
            "font-src 'self' data:; "
            "connect-src 'self' ws: wss:; "
            "frame-ancestors 'none'; "
            "base-uri 'self'; "
            "form-action 'self';"
        )

        # Clickjacking protection
        response.headers["X-Frame-Options"] = "DENY"

        # Prevent MIME-type sniffing
        response.headers["X-Content-Type-Options"] = "nosniff"

        # XSS Protection (legacy but still useful for older browsers)
        response.headers["X-XSS-Protection"] = "1; mode=block"

        # Force HTTPS (only in production)
        environment = os.getenv("ENVIRONMENT", "development").lower()
        if environment != "development":
            response.headers["Strict-Transport-Security"] = (
                "max-age=31536000; includeSubDomains; preload"
            )

        # Referrer Policy - Limit information leakage
        response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"

        # Permissions Policy - Disable unnecessary browser features
        response.headers["Permissions-Policy"] = (
            "geolocation=(), "
            "microphone=(), "
            "camera=(), "
            "payment=(), "
            "usb=(), "
            "magnetometer=(), "
            "gyroscope=(), "
            "accelerometer=()"
        )

        # Cross-Origin policies for additional isolation
        response.headers["Cross-Origin-Embedder-Policy"] = "require-corp"
        response.headers["Cross-Origin-Opener-Policy"] = "same-origin"
        response.headers["Cross-Origin-Resource-Policy"] = "same-origin"

        # Remove server identity to prevent information disclosure
        if "server" in response.headers:
            del response.headers["server"]

        # Add custom security header for version tracking (optional)
        response.headers["X-Security-Version"] = "1.0.0"

        return response


class TrustedHostMiddleware(BaseHTTPMiddleware):
    """
    Middleware for mTLS and Trusted Host validation placeholders.
    In real deployment, mTLS is terminated at the Load Balancer / Mesh (Istio),
    but this layer validates the forwarded client certificate headers.
    """

    async def dispatch(self, request: Request, call_next):
        # Placeholder: Verify X-Forwarded-Client-Cert if we were in stricter mode
        # cert = request.headers.get("X-Forwarded-Client-Cert")
        # if not cert and request.url.path not in ["/health", "/metrics", "/docs", "/openapi.json"]:
        #    ...

        return await call_next(request)


class ZeroTrustMiddleware(BaseHTTPMiddleware):
    """
    Middleware to enforce strict API Key validation for backend-to-backend communication
    between Electron (Main Process) and FastAPI.
    """

    def __init__(self, app, api_key: str | None = None):
        super().__init__(app)
        self.api_key = api_key or os.getenv("BACKEND_API_KEY")
        self.exempt_paths = [
            "/health",
            "/health/ready",
            "/health/live",
            "/metrics",
            "/docs",
            "/openapi.json",
            "/redoc",
        ]

    async def dispatch(self, request: Request, call_next):
        # Exempt health checks and documentation
        if request.url.path in self.exempt_paths:
            return await call_next(request)

        # Check for API Key in headers
        provided_key = request.headers.get("X-API-Key")

        if not self.api_key:
            # If no API key is configured, allow for now but log warning
            # In true Zero Trust, we would fail here.
            return await call_next(request)

        if provided_key != self.api_key:
            from starlette.responses import JSONResponse

            return JSONResponse(
                status_code=403,
                content={"detail": "Zero-Trust Violation: Invalid or missing API Key"},
            )

        return await call_next(request)