File size: 2,479 Bytes
bbe01fe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
import time
from app.core.config import get_settings

# auto_error=False so we can raise 401 (Unauthorized) ourselves when no token
# is provided. Without this, FastAPI's HTTPBearer raises 403 (Forbidden) for
# missing credentials, which is semantically incorrect.
security = HTTPBearer(auto_error=False)


def verify_jwt(
    credentials: HTTPAuthorizationCredentials | None = Depends(security),
) -> dict:
    """
    Decodes the Bearer token provided by the PersonaBot API / SSE client.
    Must perfectly match the JWT_SECRET and algorithm configured in .env.
    Typically, this is generated by the user's external Vercel/NextJS routing layer.
    """
    if credentials is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Authorization header required.",
            headers={"WWW-Authenticate": "Bearer"},
        )

    settings = get_settings()
    token = credentials.credentials

    # If we are in local testing and no secret is set, we might bypass,
    # but for production parity, we strictly enforce it unless explicitly configured.
    if not settings.JWT_SECRET:
         # In a true 0$ prod deployment, failing to set a secret drops all traffic.
         raise HTTPException(
             status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
             detail="JWT_SECRET is not configured on the server."
         )

    try:
        # Decode the token securely
        payload = jwt.decode(
            token,
            settings.JWT_SECRET,
            algorithms=[settings.JWT_ALGORITHM]
        )

        # Verify expiration
        exp = payload.get("exp")
        if not exp or time.time() > exp:
             raise HTTPException(
                 status_code=status.HTTP_401_UNAUTHORIZED,
                 detail="Token has expired.",
                 headers={"WWW-Authenticate": "Bearer"},
             )

        # Optional: You can add `aud` (audience) or `iss` (issuer) validations here.
        # But this basic structurally sound signature & expiry check stops scraping.

        return payload

    except JWTError as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=f"Could not validate credentials: {str(e)}",
            headers={"WWW-Authenticate": "Bearer"},
        )