Spaces:
Sleeping
Sleeping
| """Security utilities: API key validation, webhook signature verification, rate limiting.""" | |
| from __future__ import annotations | |
| import hashlib | |
| import hmac | |
| import time | |
| from typing import Optional | |
| from fastapi import HTTPException, Request, Security | |
| from fastapi.security import APIKeyHeader | |
| from slowapi import Limiter | |
| from slowapi.util import get_remote_address | |
| from app.core.config import settings | |
| api_key_header = APIKeyHeader(name=settings.api_key_header, auto_error=False) | |
| def get_api_key(api_key: Optional[str] = Security(api_key_header)) -> str: | |
| if not api_key or api_key not in settings.allowed_api_keys: | |
| raise HTTPException(status_code=403, detail="Invalid or missing API key") | |
| return api_key | |
| def _key_func(request: Request) -> str: | |
| api_key = request.headers.get(settings.api_key_header, "") | |
| if api_key: | |
| return api_key | |
| return get_remote_address(request) | |
| limiter = Limiter(key_func=_key_func, default_limits=[f"{settings.rate_limit_per_minute}/minute"]) | |
| def verify_webhook_signature(payload: bytes, signature: str, timestamp: str) -> bool: | |
| """Verify Stripe-style webhook signature (t=timestamp,v1=signature).""" | |
| if not signature or not timestamp: | |
| return False | |
| try: | |
| ts = int(timestamp) | |
| except (ValueError, TypeError): | |
| return False | |
| if abs(time.time() - ts) > 300: | |
| return False | |
| signed_payload = f"{timestamp}.{payload.decode('utf-8')}" | |
| expected = hmac.new( | |
| settings.webhook_secret.encode("utf-8"), | |
| signed_payload.encode("utf-8"), | |
| hashlib.sha256, | |
| ).hexdigest() | |
| parts = signature.split(",") | |
| for part in parts: | |
| if part.startswith("v1="): | |
| sig_value = part[3:] | |
| if hmac.compare_digest(expected, sig_value): | |
| return True | |
| return False | |