| """
|
| Request Size Limits Middleware for AegisLM
|
|
|
| Provides middleware to enforce request size limits for API protection.
|
| """
|
|
|
| import os
|
| from typing import Callable, Optional
|
|
|
| from fastapi import FastAPI, Request, Response
|
| from fastapi.responses import JSONResponse
|
| from starlette.middleware.base import BaseHTTPMiddleware
|
| from starlette.types import ASGIApp
|
|
|
|
|
| class RequestSizeLimitMiddleware(BaseHTTPMiddleware):
|
| """
|
| Middleware to enforce maximum request size limits.
|
|
|
| Protects against:
|
| - Denial of Service (DoS) attacks via large payloads
|
| - Memory exhaustion from oversized requests
|
| - Buffer overflow attacks
|
| """
|
|
|
| def __init__(
|
| self,
|
| app: ASGIApp,
|
| max_request_size_bytes: Optional[int] = None,
|
| ):
|
| """
|
| Initialize the middleware.
|
|
|
| Args:
|
| app: ASGI application
|
| max_request_size_bytes: Maximum request size in bytes.
|
| Defaults to 10MB if not specified.
|
| """
|
| super().__init__(app)
|
| self.max_request_size_bytes = max_request_size_bytes or self._get_default_max_size()
|
|
|
| def _get_default_max_size(self) -> int:
|
| """Get default max request size from environment or use default."""
|
|
|
| default_size = 10 * 1024 * 1024
|
|
|
| env_size = os.getenv("AEGISLM_MAX_REQUEST_SIZE_BYTES")
|
| if env_size:
|
| try:
|
| return int(env_size)
|
| except ValueError:
|
| pass
|
|
|
| return default_size
|
|
|
| async def dispatch(self, request: Request, call_next: Callable) -> Response:
|
| """
|
| Process the request and enforce size limits.
|
|
|
| Args:
|
| request: The incoming request
|
| call_next: The next middleware or route handler
|
|
|
| Returns:
|
| Response or error if request is too large
|
| """
|
|
|
| content_length = request.headers.get("content-length")
|
|
|
| if content_length:
|
| try:
|
| content_length = int(content_length)
|
|
|
| if content_length > self.max_request_size_bytes:
|
| return JSONResponse(
|
| status_code=413,
|
| content={
|
| "error": "request_too_large",
|
| "message": f"Request body exceeds maximum allowed size of {self.max_request_size_bytes} bytes",
|
| "max_size_bytes": self.max_request_size_bytes,
|
| }
|
| )
|
| except ValueError:
|
| pass
|
|
|
|
|
| try:
|
| body = await request.body()
|
| if len(body) > self.max_request_size_bytes:
|
| return JSONResponse(
|
| status_code=413,
|
| content={
|
| "error": "request_too_large",
|
| "message": f"Request body exceeds maximum allowed size of {self.max_request_size_bytes} bytes",
|
| "max_size_bytes": self.max_request_size_bytes,
|
| }
|
| )
|
|
|
|
|
| async def receive():
|
| return {"type": "http.request", "body": body}
|
|
|
| request._receive = receive
|
|
|
| except Exception:
|
| pass
|
|
|
| return await call_next(request)
|
|
|
|
|
|
|
|
|
|
|
|
|
| class SecurityHeadersMiddleware(BaseHTTPMiddleware):
|
| """
|
| Middleware to add security headers to responses.
|
|
|
| Adds headers for:
|
| - Content Security Policy
|
| - X-Frame-Options
|
| - X-Content-Type-Options
|
| - Strict-Transport-Security
|
| - X-XSS-Protection
|
| """
|
|
|
| async def dispatch(self, request: Request, call_next: Callable) -> Response:
|
| """
|
| Add security headers to the response.
|
|
|
| Args:
|
| request: The incoming request
|
| call_next: The next middleware or route handler
|
|
|
| Returns:
|
| Response with security headers
|
| """
|
| response = await call_next(request)
|
|
|
|
|
| response.headers["X-Content-Type-Options"] = "nosniff"
|
| response.headers["X-Frame-Options"] = "DENY"
|
| response.headers["X-XSS-Protection"] = "1; mode=block"
|
| response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
|
| response.headers["Content-Security-Policy"] = "default-src 'self'"
|
|
|
| return response
|
|
|
|
|
| def get_request_size_limit() -> int:
|
| """
|
| Get the configured request size limit.
|
|
|
| Returns:
|
| Maximum request size in bytes
|
| """
|
| default_size = 10 * 1024 * 1024
|
| env_size = os.getenv("AEGISLM_MAX_REQUEST_SIZE_BYTES")
|
| if env_size:
|
| try:
|
| return int(env_size)
|
| except ValueError:
|
| pass
|
| return default_size
|
|
|