|
|
import os |
|
|
import json |
|
|
from typing import Dict, Optional |
|
|
from datetime import datetime, timedelta |
|
|
from jwt import PyJWKClient, decode, InvalidTokenError, get_unverified_header |
|
|
from fastapi import HTTPException, status |
|
|
from api.core.config import settings |
|
|
|
|
|
|
|
|
class SupabaseJWT: |
|
|
"""Handle Supabase JWT token verification and user extraction.""" |
|
|
|
|
|
def __init__(self, supabase_url: str): |
|
|
if not supabase_url: |
|
|
raise ValueError("SUPABASE_URL is not configured") |
|
|
|
|
|
self.supabase_url = supabase_url.rstrip('/') |
|
|
self.jwks_url = f"{self.supabase_url}/auth/v1/.well-known/jwks.json" |
|
|
self._jwk_client: Optional[PyJWKClient] = None |
|
|
print(f"[DEBUG] Initialized SupabaseJWT with URL: {self.supabase_url}") |
|
|
print(f"[DEBUG] JWKS URL: {self.jwks_url}") |
|
|
|
|
|
@property |
|
|
def jwk_client(self) -> PyJWKClient: |
|
|
"""Lazily initialize and cache the JWK client.""" |
|
|
if self._jwk_client is None: |
|
|
try: |
|
|
self._jwk_client = PyJWKClient(self.jwks_url) |
|
|
print("[DEBUG] PyJWKClient initialized successfully") |
|
|
except Exception as e: |
|
|
print(f"[ERROR] Failed to initialize PyJWKClient: {e}") |
|
|
raise |
|
|
return self._jwk_client |
|
|
|
|
|
def verify_token(self, token: str) -> Dict: |
|
|
""" |
|
|
Verify a Supabase JWT token and return the payload. |
|
|
|
|
|
Args: |
|
|
token: JWT token string |
|
|
|
|
|
Returns: |
|
|
Decoded token payload |
|
|
|
|
|
Raises: |
|
|
HTTPException: If token is invalid or expired |
|
|
""" |
|
|
try: |
|
|
|
|
|
unverified_header = get_unverified_header(token) |
|
|
print(f"[DEBUG] Token algorithm: {unverified_header.get('alg')}") |
|
|
print(f"[DEBUG] Token kid: {unverified_header.get('kid')}") |
|
|
|
|
|
|
|
|
try: |
|
|
signing_key = self.jwk_client.get_signing_key_from_jwt(token) |
|
|
print(f"[DEBUG] Successfully retrieved signing key") |
|
|
except Exception as e: |
|
|
print(f"[ERROR] Failed to get signing key: {e}") |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
|
detail=f"Could not retrieve signing key: {str(e)}", |
|
|
headers={"WWW-Authenticate": "Bearer"}, |
|
|
) |
|
|
|
|
|
|
|
|
payload = decode( |
|
|
token, |
|
|
signing_key.key, |
|
|
algorithms=["RS256", "HS256", "ES256"], |
|
|
options={"verify_aud": False}, |
|
|
) |
|
|
print("[DEBUG] Token verified successfully") |
|
|
return payload |
|
|
|
|
|
except InvalidTokenError as e: |
|
|
print(f"[ERROR] Invalid token: {e}") |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
|
detail=f"Invalid authentication credentials: {str(e)}", |
|
|
headers={"WWW-Authenticate": "Bearer"}, |
|
|
) |
|
|
except HTTPException: |
|
|
raise |
|
|
except Exception as e: |
|
|
print(f"[ERROR] Token verification failed: {e}") |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
|
detail="Could not validate credentials", |
|
|
headers={"WWW-Authenticate": "Bearer"}, |
|
|
) |
|
|
|
|
|
def extract_user(self, payload: Dict) -> Dict: |
|
|
""" |
|
|
Extract user information from token payload. |
|
|
|
|
|
Args: |
|
|
payload: Decoded JWT payload |
|
|
|
|
|
Returns: |
|
|
User object with id, email, role |
|
|
""" |
|
|
return { |
|
|
"id": payload.get("sub"), |
|
|
"email": payload.get("email"), |
|
|
"role": payload.get("role") or payload.get("app_metadata", {}).get("role", "user"), |
|
|
"phone": payload.get("phone"), |
|
|
"user_metadata": payload.get("user_metadata", {}), |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
supabase_jwt = None |
|
|
try: |
|
|
if settings.supabase_url: |
|
|
supabase_jwt = SupabaseJWT(settings.supabase_url) |
|
|
else: |
|
|
print("[WARNING] SUPABASE_URL not configured") |
|
|
except Exception as e: |
|
|
print(f"[ERROR] Failed to initialize Supabase JWT: {e}") |
|
|
|
|
|
|
|
|
def verify_supabase_token(token: str) -> Dict: |
|
|
"""Verify and decode a Supabase JWT token.""" |
|
|
if not supabase_jwt: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
|
|
detail="Supabase not configured", |
|
|
) |
|
|
return supabase_jwt.verify_token(token) |
|
|
|
|
|
|
|
|
def extract_user_from_token(payload: Dict) -> Dict: |
|
|
"""Extract user info from token payload.""" |
|
|
if not supabase_jwt: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
|
|
detail="Supabase not configured", |
|
|
) |
|
|
user = supabase_jwt.extract_user(payload) |
|
|
if not user.get("id"): |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
|
detail="Invalid token payload", |
|
|
) |
|
|
return user |
|
|
|