| from __future__ import annotations |
| import hashlib |
| import hmac |
| import os |
| import time |
| from collections import defaultdict |
| from fastapi import APIRouter, Header, HTTPException, Request |
| from pydantic import BaseModel |
|
|
| router = APIRouter(prefix="/api/auth", tags=["auth"]) |
| SECRET = os.environ.get("APERTURE_SECRET", "aperture-mvp-secret-change-in-production") |
| DEMO_MODE = os.environ.get("APERTURE_DEMO", "true").lower() == "true" |
|
|
| |
| _RATE_LIMIT = 5 |
| _RATE_WINDOW = 300 |
| _rate_buckets: dict[str, list[float]] = defaultdict(list) |
|
|
|
|
| def _check_rate_limit(key: str) -> None: |
| now = time.time() |
| bucket = _rate_buckets[key] |
| |
| _rate_buckets[key] = [t for t in bucket if now - t < _RATE_WINDOW] |
| if len(_rate_buckets[key]) >= _RATE_LIMIT: |
| raise HTTPException(status_code=429, detail="Too many requests. Try again later.") |
| _rate_buckets[key].append(now) |
|
|
| class AuthRequest(BaseModel): |
| email: str |
|
|
| class VerifyRequest(BaseModel): |
| email: str |
| token: str |
|
|
| @router.post("/request") |
| async def request_magic_link(req: AuthRequest, request: Request): |
| _check_rate_limit(request.client.host if request.client else "unknown") |
| token = _generate_token(req.email) |
| resp = {"message": "Magic link sent"} |
| if DEMO_MODE: |
| resp["demo_token"] = token |
| return resp |
|
|
| @router.post("/verify") |
| async def verify_token(req: VerifyRequest): |
| expected = _generate_token(req.email) |
| if not hmac.compare_digest(req.token, expected): |
| raise HTTPException(status_code=401, detail="Invalid or expired token") |
| return {"email": req.email, "verified": True} |
|
|
| async def get_current_user(authorization: str = Header(default=None)) -> str: |
| """FastAPI dependency: extract and verify email from Authorization header. |
| |
| Expected format: Bearer <email>:<token> |
| Returns the verified email address. |
| """ |
| if not authorization: |
| raise HTTPException(status_code=401, detail="Authorization header missing") |
| parts = authorization.split(" ", 1) |
| if len(parts) != 2 or parts[0] != "Bearer": |
| raise HTTPException(status_code=401, detail="Invalid authorization format") |
| payload = parts[1] |
| if ":" not in payload: |
| raise HTTPException(status_code=401, detail="Invalid token format") |
| email, token = payload.split(":", 1) |
| |
| for offset in (0, -1): |
| expected = _generate_token_for_hour(email, offset) |
| if hmac.compare_digest(token, expected): |
| return email |
| raise HTTPException(status_code=401, detail="Invalid or expired token") |
|
|
|
|
| def _generate_token_for_hour(email: str, hour_offset: int = 0) -> str: |
| """Generate token for a specific hour offset (0 = current, -1 = previous).""" |
| hour = int(time.time() // 3600) + hour_offset |
| payload = f"{email}:{hour}:{SECRET}" |
| return hashlib.sha256(payload.encode()).hexdigest()[:32] |
|
|
|
|
| def _generate_token(email: str) -> str: |
| return _generate_token_for_hour(email, 0) |
|
|