from collections import defaultdict from time import perf_counter, time from fastapi import Request, status from src.common.utils import get_client_ip, response_error class RateLimiter: def __init__(self, max_requests: int = 10, window: int = 60): self.max_requests = max_requests self.window = window self.requests = defaultdict(list) def is_allowed(self, key: str) -> bool: now = time() self.requests[key] = [ req_time for req_time in self.requests[key] if now - req_time < self.window ] if len(self.requests[key]) >= self.max_requests: return False self.requests[key].append(now) return True rate_limiter = RateLimiter(max_requests=20, window=60) def register_middlewares(app, logger): @app.middleware("http") async def calculator_time(request: Request, call_next): start = perf_counter() client = get_client_ip(request) method = request.method path = request.url.path logger.info(f"{method} {path} - Client: {client} - Started") response = await call_next(request) duration = perf_counter() - start status = response.status_code logger.info( f"{method} {path} - Client: {client} - Status: {status} - Completed in {duration:.3f}s" ) return response @app.middleware("http") async def rate_limit_middleware(request: Request, call_next): client_ip = get_client_ip(request) if not rate_limiter.is_allowed(client_ip): logger.warning(f"Rate limit exceeded: {client_ip}") return response_error( "RATE_LIMIT", "Too many requests. Try later.", status.HTTP_429_TOO_MANY_REQUESTS, ) return await call_next(request)