File size: 3,124 Bytes
ae74af5
 
 
3b71d95
ae74af5
3b71d95
 
ae74af5
 
 
3b71d95
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ae74af5
 
 
 
 
 
 
 
 
3b71d95
 
ae74af5
3b71d95
 
 
 
ae74af5
 
 
 
 
 
 
 
a7da499
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ae74af5
 
a7da499
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
from __future__ import annotations
import hashlib
import hmac
import os
import time
from collections import defaultdict
from fastapi import APIRouter, Header, HTTPException, Request
from pydantic import BaseModel

router = APIRouter(prefix="/api/auth", tags=["auth"])
SECRET = os.environ.get("APERTURE_SECRET", "aperture-mvp-secret-change-in-production")
DEMO_MODE = os.environ.get("APERTURE_DEMO", "true").lower() == "true"

# Simple in-memory rate limiter: max requests per window
_RATE_LIMIT = 5           # max requests
_RATE_WINDOW = 300        # per 5-minute window (seconds)
_rate_buckets: dict[str, list[float]] = defaultdict(list)


def _check_rate_limit(key: str) -> None:
    now = time.time()
    bucket = _rate_buckets[key]
    # Prune old entries
    _rate_buckets[key] = [t for t in bucket if now - t < _RATE_WINDOW]
    if len(_rate_buckets[key]) >= _RATE_LIMIT:
        raise HTTPException(status_code=429, detail="Too many requests. Try again later.")
    _rate_buckets[key].append(now)

class AuthRequest(BaseModel):
    email: str

class VerifyRequest(BaseModel):
    email: str
    token: str

@router.post("/request")
async def request_magic_link(req: AuthRequest, request: Request):
    _check_rate_limit(request.client.host if request.client else "unknown")
    token = _generate_token(req.email)
    resp = {"message": "Magic link sent"}
    if DEMO_MODE:
        resp["demo_token"] = token
    return resp

@router.post("/verify")
async def verify_token(req: VerifyRequest):
    expected = _generate_token(req.email)
    if not hmac.compare_digest(req.token, expected):
        raise HTTPException(status_code=401, detail="Invalid or expired token")
    return {"email": req.email, "verified": True}

async def get_current_user(authorization: str = Header(default=None)) -> str:
    """FastAPI dependency: extract and verify email from Authorization header.

    Expected format: Bearer <email>:<token>
    Returns the verified email address.
    """
    if not authorization:
        raise HTTPException(status_code=401, detail="Authorization header missing")
    parts = authorization.split(" ", 1)
    if len(parts) != 2 or parts[0] != "Bearer":
        raise HTTPException(status_code=401, detail="Invalid authorization format")
    payload = parts[1]
    if ":" not in payload:
        raise HTTPException(status_code=401, detail="Invalid token format")
    email, token = payload.split(":", 1)
    # Verify against current and previous hour (handle clock edge)
    for offset in (0, -1):
        expected = _generate_token_for_hour(email, offset)
        if hmac.compare_digest(token, expected):
            return email
    raise HTTPException(status_code=401, detail="Invalid or expired token")


def _generate_token_for_hour(email: str, hour_offset: int = 0) -> str:
    """Generate token for a specific hour offset (0 = current, -1 = previous)."""
    hour = int(time.time() // 3600) + hour_offset
    payload = f"{email}:{hour}:{SECRET}"
    return hashlib.sha256(payload.encode()).hexdigest()[:32]


def _generate_token(email: str) -> str:
    return _generate_token_for_hour(email, 0)