from __future__ import annotations import hashlib import hmac import os import time from collections import defaultdict from fastapi import APIRouter, Header, HTTPException, Request from pydantic import BaseModel router = APIRouter(prefix="/api/auth", tags=["auth"]) SECRET = os.environ.get("APERTURE_SECRET", "aperture-mvp-secret-change-in-production") DEMO_MODE = os.environ.get("APERTURE_DEMO", "true").lower() == "true" # Simple in-memory rate limiter: max requests per window _RATE_LIMIT = 5 # max requests _RATE_WINDOW = 300 # per 5-minute window (seconds) _rate_buckets: dict[str, list[float]] = defaultdict(list) def _check_rate_limit(key: str) -> None: now = time.time() bucket = _rate_buckets[key] # Prune old entries _rate_buckets[key] = [t for t in bucket if now - t < _RATE_WINDOW] if len(_rate_buckets[key]) >= _RATE_LIMIT: raise HTTPException(status_code=429, detail="Too many requests. Try again later.") _rate_buckets[key].append(now) class AuthRequest(BaseModel): email: str class VerifyRequest(BaseModel): email: str token: str @router.post("/request") async def request_magic_link(req: AuthRequest, request: Request): _check_rate_limit(request.client.host if request.client else "unknown") token = _generate_token(req.email) resp = {"message": "Magic link sent"} if DEMO_MODE: resp["demo_token"] = token return resp @router.post("/verify") async def verify_token(req: VerifyRequest): expected = _generate_token(req.email) if not hmac.compare_digest(req.token, expected): raise HTTPException(status_code=401, detail="Invalid or expired token") return {"email": req.email, "verified": True} async def get_current_user(authorization: str = Header(default=None)) -> str: """FastAPI dependency: extract and verify email from Authorization header. Expected format: Bearer : Returns the verified email address. """ if not authorization: raise HTTPException(status_code=401, detail="Authorization header missing") parts = authorization.split(" ", 1) if len(parts) != 2 or parts[0] != "Bearer": raise HTTPException(status_code=401, detail="Invalid authorization format") payload = parts[1] if ":" not in payload: raise HTTPException(status_code=401, detail="Invalid token format") email, token = payload.split(":", 1) # Verify against current and previous hour (handle clock edge) for offset in (0, -1): expected = _generate_token_for_hour(email, offset) if hmac.compare_digest(token, expected): return email raise HTTPException(status_code=401, detail="Invalid or expired token") def _generate_token_for_hour(email: str, hour_offset: int = 0) -> str: """Generate token for a specific hour offset (0 = current, -1 = previous).""" hour = int(time.time() // 3600) + hour_offset payload = f"{email}:{hour}:{SECRET}" return hashlib.sha256(payload.encode()).hexdigest()[:32] def _generate_token(email: str) -> str: return _generate_token_for_hour(email, 0)