"""Supabase JWT verification (Batch C1). Matrix Builder uses Supabase Auth; the FastAPI API verifies the bearer JWT itself (HS256 with the project's JWT secret) and extracts the user id (``sub``). That user id is what scopes every row via RLS — see ``app.db.engine.session_scope``. """ from __future__ import annotations import jwt from fastapi import Depends, Header, HTTPException, status from app.core.config import Settings, get_settings class AuthError(Exception): """Raised when a token is missing, malformed, expired, or fails verification.""" def verify_token(token: str, settings: Settings | None = None) -> dict: """Verify a Supabase JWT and return its claims, or raise ``AuthError``.""" settings = settings or get_settings() try: return jwt.decode( token, settings.supabase_jwt_secret, algorithms=[settings.supabase_jwt_algorithm], audience=settings.supabase_jwt_audience, options={"require": ["sub", "exp"]}, ) except jwt.PyJWTError as exc: # expired, bad signature, wrong audience, missing claims raise AuthError(str(exc)) from exc def user_id_from_token(token: str, settings: Settings | None = None) -> str: claims = verify_token(token, settings) sub = claims.get("sub") if not sub: raise AuthError("token has no subject") return str(sub) def _bearer(authorization: str | None) -> str: if not authorization or not authorization.lower().startswith("bearer "): raise AuthError("missing bearer token") return authorization.split(" ", 1)[1].strip() def current_user_id( authorization: str | None = Header(default=None), settings: Settings = Depends(get_settings), ) -> str: """FastAPI dependency: the authenticated Supabase user id (raises 401 otherwise).""" try: return user_id_from_token(_bearer(authorization), settings) except AuthError as exc: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or missing authentication token.", headers={"WWW-Authenticate": "Bearer"}, ) from exc def current_claims( authorization: str | None = Header(default=None), settings: Settings = Depends(get_settings), ) -> dict: """FastAPI dependency: the full verified token claims (sub, email, …), or 401.""" try: return verify_token(_bearer(authorization), settings) except AuthError as exc: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or missing authentication token.", headers={"WWW-Authenticate": "Bearer"}, ) from exc # Sentinel owner for unauthenticated callers — lets guest/demo flows persist runs # without forcing login, while authenticated users get their own isolated history. GUEST_OWNER_ID = "guest" def optional_user_id( authorization: str | None = Header(default=None), settings: Settings = Depends(get_settings), ) -> str: """FastAPI dependency: the authenticated user id, or ``GUEST_OWNER_ID``. Never raises — used by routes that should work signed-out but still scope persisted history per user when a valid token is present. """ try: return user_id_from_token(_bearer(authorization), settings) except AuthError: return GUEST_OWNER_ID __all__ = [ "AuthError", "verify_token", "user_id_from_token", "current_user_id", "current_claims", "optional_user_id", "GUEST_OWNER_ID", ]