Spaces:
Running
Running
| from collections import defaultdict, deque | |
| from time import monotonic | |
| from fastapi import HTTPException, Request | |
| from config import TELEGRAM_WEBHOOK_SECRET | |
| FAILED_ATTEMPT_WINDOW_SECONDS = 60 | |
| FAILED_ATTEMPT_LIMIT = 5 | |
| BLOCK_DURATION_SECONDS = 15 * 60 | |
| _failed_secret_attempts: dict[str, deque[float]] = defaultdict(deque) | |
| _blocked_clients: dict[str, float] = {} | |
| def _get_client_key(request: Request) -> str: | |
| forwarded_for = request.headers.get("x-forwarded-for") | |
| if forwarded_for: | |
| return forwarded_for.split(",")[0].strip() | |
| if request.client and request.client.host: | |
| return request.client.host | |
| return "unknown" | |
| def _prune_failed_attempts(client_key: str, now: float) -> deque[float]: | |
| attempts = _failed_secret_attempts[client_key] | |
| cutoff = now - FAILED_ATTEMPT_WINDOW_SECONDS | |
| while attempts and attempts[0] < cutoff: | |
| attempts.popleft() | |
| return attempts | |
| def validate_webhook_secret(request: Request, secret_header: str | None) -> None: | |
| if not TELEGRAM_WEBHOOK_SECRET: | |
| raise HTTPException(status_code=500, detail="Webhook secret is not configured") | |
| client_key = _get_client_key(request) | |
| now = monotonic() | |
| blocked_until = _blocked_clients.get(client_key) | |
| if blocked_until and now < blocked_until: | |
| raise HTTPException(status_code=429, detail="Too many requests") | |
| if secret_header != TELEGRAM_WEBHOOK_SECRET: | |
| attempts = _prune_failed_attempts(client_key, now) | |
| attempts.append(now) | |
| if len(attempts) >= FAILED_ATTEMPT_LIMIT: | |
| _blocked_clients[client_key] = now + BLOCK_DURATION_SECONDS | |
| attempts.clear() | |
| raise HTTPException(status_code=403, detail="Forbidden") | |
| _blocked_clients.pop(client_key, None) | |
| _failed_secret_attempts.pop(client_key, None) | |