""" Request Size Limits Middleware for AegisLM Provides middleware to enforce request size limits for API protection. """ import os from typing import Callable, Optional from fastapi import FastAPI, Request, Response from fastapi.responses import JSONResponse from starlette.middleware.base import BaseHTTPMiddleware from starlette.types import ASGIApp class RequestSizeLimitMiddleware(BaseHTTPMiddleware): """ Middleware to enforce maximum request size limits. Protects against: - Denial of Service (DoS) attacks via large payloads - Memory exhaustion from oversized requests - Buffer overflow attacks """ def __init__( self, app: ASGIApp, max_request_size_bytes: Optional[int] = None, ): """ Initialize the middleware. Args: app: ASGI application max_request_size_bytes: Maximum request size in bytes. Defaults to 10MB if not specified. """ super().__init__(app) self.max_request_size_bytes = max_request_size_bytes or self._get_default_max_size() def _get_default_max_size(self) -> int: """Get default max request size from environment or use default.""" # Default: 10MB default_size = 10 * 1024 * 1024 # 10 MB env_size = os.getenv("AEGISLM_MAX_REQUEST_SIZE_BYTES") if env_size: try: return int(env_size) except ValueError: pass return default_size async def dispatch(self, request: Request, call_next: Callable) -> Response: """ Process the request and enforce size limits. Args: request: The incoming request call_next: The next middleware or route handler Returns: Response or error if request is too large """ # Get content length content_length = request.headers.get("content-length") if content_length: try: content_length = int(content_length) if content_length > self.max_request_size_bytes: return JSONResponse( status_code=413, # Payload Too Large content={ "error": "request_too_large", "message": f"Request body exceeds maximum allowed size of {self.max_request_size_bytes} bytes", "max_size_bytes": self.max_request_size_bytes, } ) except ValueError: pass # Also check for Content-Length mismatch during streaming try: body = await request.body() if len(body) > self.max_request_size_bytes: return JSONResponse( status_code=413, # Payload Too Large content={ "error": "request_too_large", "message": f"Request body exceeds maximum allowed size of {self.max_request_size_bytes} bytes", "max_size_bytes": self.max_request_size_bytes, } ) # Re-create request with body for downstream handlers async def receive(): return {"type": "http.request", "body": body} request._receive = receive except Exception: pass return await call_next(request) # ============================================================================= # Additional security limits # ============================================================================= class SecurityHeadersMiddleware(BaseHTTPMiddleware): """ Middleware to add security headers to responses. Adds headers for: - Content Security Policy - X-Frame-Options - X-Content-Type-Options - Strict-Transport-Security - X-XSS-Protection """ async def dispatch(self, request: Request, call_next: Callable) -> Response: """ Add security headers to the response. Args: request: The incoming request call_next: The next middleware or route handler Returns: Response with security headers """ response = await call_next(request) # Add security headers response.headers["X-Content-Type-Options"] = "nosniff" response.headers["X-Frame-Options"] = "DENY" response.headers["X-XSS-Protection"] = "1; mode=block" response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains" response.headers["Content-Security-Policy"] = "default-src 'self'" return response def get_request_size_limit() -> int: """ Get the configured request size limit. Returns: Maximum request size in bytes """ default_size = 10 * 1024 * 1024 # 10 MB env_size = os.getenv("AEGISLM_MAX_REQUEST_SIZE_BYTES") if env_size: try: return int(env_size) except ValueError: pass return default_size