Aperture / app /api /auth.py
KSvend
security: fix hardcoded secret, gate demo mode, add CORS and rate limiting
3b71d95
from __future__ import annotations
import hashlib
import hmac
import os
import time
from collections import defaultdict
from fastapi import APIRouter, Header, HTTPException, Request
from pydantic import BaseModel
router = APIRouter(prefix="/api/auth", tags=["auth"])
SECRET = os.environ.get("APERTURE_SECRET", "aperture-mvp-secret-change-in-production")
DEMO_MODE = os.environ.get("APERTURE_DEMO", "true").lower() == "true"
# Simple in-memory rate limiter: max requests per window
_RATE_LIMIT = 5 # max requests
_RATE_WINDOW = 300 # per 5-minute window (seconds)
_rate_buckets: dict[str, list[float]] = defaultdict(list)
def _check_rate_limit(key: str) -> None:
now = time.time()
bucket = _rate_buckets[key]
# Prune old entries
_rate_buckets[key] = [t for t in bucket if now - t < _RATE_WINDOW]
if len(_rate_buckets[key]) >= _RATE_LIMIT:
raise HTTPException(status_code=429, detail="Too many requests. Try again later.")
_rate_buckets[key].append(now)
class AuthRequest(BaseModel):
email: str
class VerifyRequest(BaseModel):
email: str
token: str
@router.post("/request")
async def request_magic_link(req: AuthRequest, request: Request):
_check_rate_limit(request.client.host if request.client else "unknown")
token = _generate_token(req.email)
resp = {"message": "Magic link sent"}
if DEMO_MODE:
resp["demo_token"] = token
return resp
@router.post("/verify")
async def verify_token(req: VerifyRequest):
expected = _generate_token(req.email)
if not hmac.compare_digest(req.token, expected):
raise HTTPException(status_code=401, detail="Invalid or expired token")
return {"email": req.email, "verified": True}
async def get_current_user(authorization: str = Header(default=None)) -> str:
"""FastAPI dependency: extract and verify email from Authorization header.
Expected format: Bearer <email>:<token>
Returns the verified email address.
"""
if not authorization:
raise HTTPException(status_code=401, detail="Authorization header missing")
parts = authorization.split(" ", 1)
if len(parts) != 2 or parts[0] != "Bearer":
raise HTTPException(status_code=401, detail="Invalid authorization format")
payload = parts[1]
if ":" not in payload:
raise HTTPException(status_code=401, detail="Invalid token format")
email, token = payload.split(":", 1)
# Verify against current and previous hour (handle clock edge)
for offset in (0, -1):
expected = _generate_token_for_hour(email, offset)
if hmac.compare_digest(token, expected):
return email
raise HTTPException(status_code=401, detail="Invalid or expired token")
def _generate_token_for_hour(email: str, hour_offset: int = 0) -> str:
"""Generate token for a specific hour offset (0 = current, -1 = previous)."""
hour = int(time.time() // 3600) + hour_offset
payload = f"{email}:{hour}:{SECRET}"
return hashlib.sha256(payload.encode()).hexdigest()[:32]
def _generate_token(email: str) -> str:
return _generate_token_for_hour(email, 0)