topic-analysis / backend /app /core /security.py
alexchilton
Initial deployment: Sentiment & Topic Analysis Dashboard
6242ddb
"""Security utilities: API key validation, webhook signature verification, rate limiting."""
from __future__ import annotations
import hashlib
import hmac
import time
from typing import Optional
from fastapi import HTTPException, Request, Security
from fastapi.security import APIKeyHeader
from slowapi import Limiter
from slowapi.util import get_remote_address
from app.core.config import settings
api_key_header = APIKeyHeader(name=settings.api_key_header, auto_error=False)
def get_api_key(api_key: Optional[str] = Security(api_key_header)) -> str:
if not api_key or api_key not in settings.allowed_api_keys:
raise HTTPException(status_code=403, detail="Invalid or missing API key")
return api_key
def _key_func(request: Request) -> str:
api_key = request.headers.get(settings.api_key_header, "")
if api_key:
return api_key
return get_remote_address(request)
limiter = Limiter(key_func=_key_func, default_limits=[f"{settings.rate_limit_per_minute}/minute"])
def verify_webhook_signature(payload: bytes, signature: str, timestamp: str) -> bool:
"""Verify Stripe-style webhook signature (t=timestamp,v1=signature)."""
if not signature or not timestamp:
return False
try:
ts = int(timestamp)
except (ValueError, TypeError):
return False
if abs(time.time() - ts) > 300:
return False
signed_payload = f"{timestamp}.{payload.decode('utf-8')}"
expected = hmac.new(
settings.webhook_secret.encode("utf-8"),
signed_payload.encode("utf-8"),
hashlib.sha256,
).hexdigest()
parts = signature.split(",")
for part in parts:
if part.startswith("v1="):
sig_value = part[3:]
if hmac.compare_digest(expected, sig_value):
return True
return False