| """In-memory sliding-window rate limiter for abuse-sensitive endpoints.""" |
| from __future__ import annotations |
|
|
| import time |
| from collections import defaultdict, deque |
| from typing import Deque, DefaultDict |
|
|
| from fastapi import HTTPException, Request, status |
|
|
|
|
| class RateLimiter: |
| def __init__(self, max_requests: int, window_seconds: int): |
| self.max_requests = max_requests |
| self.window_seconds = window_seconds |
| self._hits: DefaultDict[str, Deque[float]] = defaultdict(deque) |
|
|
| def _key(self, request: Request, bucket: str) -> str: |
| client = request.client.host if request.client else "unknown" |
| return f"{bucket}:{client}" |
|
|
| def check(self, request: Request, bucket: str) -> None: |
| now = time.time() |
| key = self._key(request, bucket) |
| q = self._hits[key] |
| while q and now - q[0] > self.window_seconds: |
| q.popleft() |
| if len(q) >= self.max_requests: |
| raise HTTPException( |
| status_code=status.HTTP_429_TOO_MANY_REQUESTS, |
| detail="Too many requests. Try again shortly.", |
| ) |
| q.append(now) |
|
|
| def check_ws(self, websocket, bucket: str) -> None: |
| """Rate limit WebSocket actions by client host.""" |
| now = time.time() |
| client = websocket.client.host if websocket.client else "ws-unknown" |
| key = f"{bucket}:ws:{client}" |
| q = self._hits[key] |
| while q and now - q[0] > self.window_seconds: |
| q.popleft() |
| if len(q) >= self.max_requests: |
| raise HTTPException( |
| status_code=status.HTTP_429_TOO_MANY_REQUESTS, |
| detail="Too many requests. Try again shortly.", |
| ) |
| q.append(now) |
|
|
|
|
| login_limiter = RateLimiter(max_requests=20, window_seconds=60) |
| refresh_limiter = RateLimiter(max_requests=30, window_seconds=60) |
| sos_limiter = RateLimiter(max_requests=10, window_seconds=60) |
|
|