"""In-memory sliding-window rate limiter for abuse-sensitive endpoints.""" from __future__ import annotations import time from collections import defaultdict, deque from typing import Deque, DefaultDict from fastapi import HTTPException, Request, status class RateLimiter: def __init__(self, max_requests: int, window_seconds: int): self.max_requests = max_requests self.window_seconds = window_seconds self._hits: DefaultDict[str, Deque[float]] = defaultdict(deque) def _key(self, request: Request, bucket: str) -> str: client = request.client.host if request.client else "unknown" return f"{bucket}:{client}" def check(self, request: Request, bucket: str) -> None: now = time.time() key = self._key(request, bucket) q = self._hits[key] while q and now - q[0] > self.window_seconds: q.popleft() if len(q) >= self.max_requests: raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="Too many requests. Try again shortly.", ) q.append(now) def check_ws(self, websocket, bucket: str) -> None: """Rate limit WebSocket actions by client host.""" now = time.time() client = websocket.client.host if websocket.client else "ws-unknown" key = f"{bucket}:ws:{client}" q = self._hits[key] while q and now - q[0] > self.window_seconds: q.popleft() if len(q) >= self.max_requests: raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="Too many requests. Try again shortly.", ) q.append(now) login_limiter = RateLimiter(max_requests=20, window_seconds=60) refresh_limiter = RateLimiter(max_requests=30, window_seconds=60) sos_limiter = RateLimiter(max_requests=10, window_seconds=60)