Spaces:
Sleeping
Sleeping
File size: 1,808 Bytes
7a0a3dd | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | from collections import defaultdict, deque
from time import monotonic
from fastapi import HTTPException, Request
from config import TELEGRAM_WEBHOOK_SECRET
FAILED_ATTEMPT_WINDOW_SECONDS = 60
FAILED_ATTEMPT_LIMIT = 5
BLOCK_DURATION_SECONDS = 15 * 60
_failed_secret_attempts: dict[str, deque[float]] = defaultdict(deque)
_blocked_clients: dict[str, float] = {}
def _get_client_key(request: Request) -> str:
forwarded_for = request.headers.get("x-forwarded-for")
if forwarded_for:
return forwarded_for.split(",")[0].strip()
if request.client and request.client.host:
return request.client.host
return "unknown"
def _prune_failed_attempts(client_key: str, now: float) -> deque[float]:
attempts = _failed_secret_attempts[client_key]
cutoff = now - FAILED_ATTEMPT_WINDOW_SECONDS
while attempts and attempts[0] < cutoff:
attempts.popleft()
return attempts
def validate_webhook_secret(request: Request, secret_header: str | None) -> None:
if not TELEGRAM_WEBHOOK_SECRET:
raise HTTPException(status_code=500, detail="Webhook secret is not configured")
client_key = _get_client_key(request)
now = monotonic()
blocked_until = _blocked_clients.get(client_key)
if blocked_until and now < blocked_until:
raise HTTPException(status_code=429, detail="Too many requests")
if secret_header != TELEGRAM_WEBHOOK_SECRET:
attempts = _prune_failed_attempts(client_key, now)
attempts.append(now)
if len(attempts) >= FAILED_ATTEMPT_LIMIT:
_blocked_clients[client_key] = now + BLOCK_DURATION_SECONDS
attempts.clear()
raise HTTPException(status_code=403, detail="Forbidden")
_blocked_clients.pop(client_key, None)
_failed_secret_attempts.pop(client_key, None)
|