Spaces:
Paused
Paused
File size: 4,851 Bytes
4a2ab42 4ae946d 4a2ab42 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 | import os
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
"""
Comprehensive security headers middleware implementing defense-in-depth.
Protects against:
- XSS (Content-Security-Policy)
- Clickjacking (X-Frame-Options)
- MIME-type attacks (X-Content-Type-Options)
- Data leaks (Referrer-Policy)
- Unwanted features (Permissions-Policy)
"""
async def dispatch(self, request: Request, call_next):
response = await call_next(request)
# Content Security Policy - Strict policy to prevent XSS
response.headers["Content-Security-Policy"] = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline' 'unsafe-eval'; " # Adjust 'unsafe-*' as needed
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data: https: blob:; "
"font-src 'self' data:; "
"connect-src 'self' ws: wss:; "
"frame-ancestors 'none'; "
"base-uri 'self'; "
"form-action 'self';"
)
# Clickjacking protection
response.headers["X-Frame-Options"] = "DENY"
# Prevent MIME-type sniffing
response.headers["X-Content-Type-Options"] = "nosniff"
# XSS Protection (legacy but still useful for older browsers)
response.headers["X-XSS-Protection"] = "1; mode=block"
# Force HTTPS (only in production)
environment = os.getenv("ENVIRONMENT", "development").lower()
if environment != "development":
response.headers["Strict-Transport-Security"] = (
"max-age=31536000; includeSubDomains; preload"
)
# Referrer Policy - Limit information leakage
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
# Permissions Policy - Disable unnecessary browser features
response.headers["Permissions-Policy"] = (
"geolocation=(), "
"microphone=(), "
"camera=(), "
"payment=(), "
"usb=(), "
"magnetometer=(), "
"gyroscope=(), "
"accelerometer=()"
)
# Cross-Origin policies for additional isolation
response.headers["Cross-Origin-Embedder-Policy"] = "require-corp"
response.headers["Cross-Origin-Opener-Policy"] = "same-origin"
response.headers["Cross-Origin-Resource-Policy"] = "same-origin"
# Remove server identity to prevent information disclosure
if "server" in response.headers:
del response.headers["server"]
# Add custom security header for version tracking (optional)
response.headers["X-Security-Version"] = "1.0.0"
return response
class TrustedHostMiddleware(BaseHTTPMiddleware):
"""
Middleware for mTLS and Trusted Host validation placeholders.
In real deployment, mTLS is terminated at the Load Balancer / Mesh (Istio),
but this layer validates the forwarded client certificate headers.
"""
async def dispatch(self, request: Request, call_next):
# Placeholder: Verify X-Forwarded-Client-Cert if we were in stricter mode
# cert = request.headers.get("X-Forwarded-Client-Cert")
# if not cert and request.url.path not in ["/health", "/metrics", "/docs", "/openapi.json"]:
# ...
return await call_next(request)
class ZeroTrustMiddleware(BaseHTTPMiddleware):
"""
Middleware to enforce strict API Key validation for backend-to-backend communication
between Electron (Main Process) and FastAPI.
"""
def __init__(self, app, api_key: str | None = None):
super().__init__(app)
self.api_key = api_key or os.getenv("BACKEND_API_KEY")
self.exempt_paths = [
"/health",
"/health/ready",
"/health/live",
"/metrics",
"/docs",
"/openapi.json",
"/redoc",
]
async def dispatch(self, request: Request, call_next):
# Exempt health checks and documentation
if request.url.path in self.exempt_paths:
return await call_next(request)
# Check for API Key in headers
provided_key = request.headers.get("X-API-Key")
if not self.api_key:
# If no API key is configured, allow for now but log warning
# In true Zero Trust, we would fail here.
return await call_next(request)
if provided_key != self.api_key:
from starlette.responses import JSONResponse
return JSONResponse(
status_code=403,
content={"detail": "Zero-Trust Violation: Invalid or missing API Key"},
)
return await call_next(request)
|