Spaces:
Sleeping
Sleeping
| import os | |
| from fastapi import Depends, HTTPException, status | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from pydantic import BaseModel | |
| from typing import Optional | |
| from jose import jwt as jose_jwt, JWTError | |
| import jwt as pyjwt | |
| from jwt.exceptions import InvalidTokenError as PyJWTInvalidTokenError | |
| from cryptography.hazmat.primitives.asymmetric import ec | |
| from cryptography.hazmat.primitives import serialization | |
| import base64 | |
| import json | |
| import urllib.request | |
| # HTTPBearer automatically extracts the Bearer token from the Authorization header | |
| security = HTTPBearer() | |
| SUPABASE_JWT_SECRET = os.getenv("SUPABASE_JWT_SECRET") | |
| SUPABASE_URL = os.getenv("SUPABASE_URL", "") | |
| if not SUPABASE_JWT_SECRET: | |
| raise RuntimeError("ERREUR FATALE: SUPABASE_JWT_SECRET est absent. Le décodage de JWT sans vérification de signature est strictement interdit (Standards Moez Elbey).") | |
| _JWKS_CACHE = {} | |
| def _jwks_to_pem(jwk: dict) -> str: | |
| """Convertit une clé JWKS ES256 en clé PEM.""" | |
| if jwk.get("kty") != "EC": | |
| raise ValueError("Support ES256 (EC) uniquement") | |
| x_b64 = jwk.get("x", "") | |
| y_b64 = jwk.get("y", "") | |
| x = base64.urlsafe_b64decode(x_b64 + "==") | |
| y = base64.urlsafe_b64decode(y_b64 + "==") | |
| public_numbers = ec.EllipticCurvePublicNumbers( | |
| x=int.from_bytes(x, byteorder="big"), | |
| y=int.from_bytes(y, byteorder="big"), | |
| curve=ec.SECP256R1() | |
| ) | |
| public_key = public_numbers.public_key() | |
| pem = public_key.public_bytes( | |
| encoding=serialization.Encoding.PEM, | |
| format=serialization.PublicFormat.SubjectPublicKeyInfo | |
| ) | |
| return pem.decode("utf-8") | |
| def _get_public_keys() -> dict: | |
| """Récupère et cache les clés publiques Supabase.""" | |
| global _JWKS_CACHE | |
| if _JWKS_CACHE: | |
| return _JWKS_CACHE | |
| try: | |
| url = f"{SUPABASE_URL}/auth/v1/.well-known/jwks.json" | |
| anon_key = os.getenv("SUPABASE_SERVICE_ROLE_KEY") or os.getenv("SUPABASE_ANON_KEY", "") | |
| req = urllib.request.Request(url) | |
| if anon_key: | |
| req.add_header("apikey", anon_key) | |
| with urllib.request.urlopen(req, timeout=5) as response: | |
| data = json.loads(response.read()) | |
| _JWKS_CACHE = data | |
| return _JWKS_CACHE | |
| except Exception as e: | |
| print(f"WARNING: Impossible de récupérer les JWKS : {e}") | |
| return {} | |
| class AuthenticatedUserInfo(BaseModel): | |
| user_id: str | |
| email: Optional[str] = None | |
| jwt: str | |
| is_admin: bool = False | |
| is_active: bool = True | |
| async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> AuthenticatedUserInfo: | |
| """ | |
| Validates the Supabase JWT sent by the Vercel frontend and extracts user details. | |
| The valid JWT is attached to the returned object, which can then be passed to the | |
| Supabase client to execute queries as that user (enforcing RLS). | |
| Supports both HS256 (symmetric) and ES256 (asymmetric via JWKS). | |
| """ | |
| token = credentials.credentials | |
| credentials_exception = HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Could not validate credentials", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| try: | |
| # 1. Determine algorithm from header | |
| header = jose_jwt.get_unverified_header(token) | |
| alg = header.get("alg") | |
| kid = header.get("kid") | |
| # 2. Validate based on algorithm | |
| if alg == "HS256": | |
| # HS256: Use HS256 secret (symmetric) | |
| payload = jose_jwt.decode( | |
| token, | |
| SUPABASE_JWT_SECRET, | |
| algorithms=["HS256"], | |
| options={"verify_aud": False} | |
| ) | |
| elif alg == "ES256": | |
| # ES256: Fetch JWKS and convert to PEM (asymmetric) | |
| jwks = _get_public_keys() | |
| keys = jwks.get("keys", []) | |
| jwk_key = None | |
| for key in keys: | |
| if key.get("kid") == kid: | |
| jwk_key = key | |
| break | |
| if not jwk_key: | |
| for key in keys: | |
| if key.get("kty") == "EC": | |
| jwk_key = key | |
| break | |
| if not jwk_key: | |
| raise JWTError(f"No ES256 key found for kid {kid}") | |
| # Convert JWKS to PEM for PyJWT | |
| verification_key = _jwks_to_pem(jwk_key) | |
| payload = pyjwt.decode( | |
| token, | |
| verification_key, | |
| algorithms=["ES256"], | |
| options={"verify_aud": False, "verify_exp": False} | |
| ) | |
| else: | |
| raise JWTError(f"Unsupported algorithm: {alg}") | |
| user_id = payload.get("sub") | |
| if not user_id: | |
| raise credentials_exception | |
| # In Supabase, custom claims or roles are often stored in app_metadata/user_metadata | |
| app_metadata = payload.get("app_metadata", {}) | |
| user_metadata = payload.get("user_metadata", {}) | |
| # Default checking | |
| is_admin = app_metadata.get("is_admin", False) or user_metadata.get("is_admin", False) | |
| is_active_app = app_metadata.get("is_active") | |
| is_active_user = user_metadata.get("is_active") | |
| is_active = False if (is_active_app is False or is_active_user is False) else True | |
| # FALLBACK: Check public.users if token is missing admin status or to strictly verify active status | |
| try: | |
| from app.core.supabase_client import get_admin_client | |
| admin_client = get_admin_client() | |
| db_res = admin_client.table("users").select("is_admin, is_active").eq("auth_user_id", user_id).execute() | |
| if db_res.data: | |
| db_user = db_res.data[0] | |
| if not is_admin and db_user.get("is_admin"): | |
| is_admin = True | |
| print(f"DEBUG AUTH: Fallback DB is_admin réussi pour {payload.get('email')}") | |
| if db_user.get("is_active") is False: | |
| is_active = False | |
| print(f"DEBUG AUTH: Fallback DB détecte is_active=False pour {payload.get('email')}") | |
| except Exception as e: | |
| print(f"WARNING: Echec du fallback DB auth : {e}") | |
| print(f"DEBUG AUTH: User {payload.get('email')} - is_admin final: {is_admin}, is_active final: {is_active}") | |
| return AuthenticatedUserInfo( | |
| user_id=user_id, | |
| email=payload.get("email"), | |
| jwt=token, | |
| is_admin=is_admin, | |
| is_active=is_active | |
| ) | |
| except (JWTError, PyJWTInvalidTokenError) as e: | |
| print(f"JWT Validation error: {e}") | |
| raise credentials_exception | |
| except Exception as e: | |
| print(f"Unknown Auth error: {e}") | |
| raise credentials_exception | |
| async def get_current_active_user(current_user: AuthenticatedUserInfo = Depends(get_current_user)): | |
| """ | |
| Enforces active user requirement. | |
| """ | |
| if not current_user.is_active: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Account is disabled." | |
| ) | |
| return current_user | |
| async def get_current_admin_user(current_user: AuthenticatedUserInfo = Depends(get_current_active_user)): | |
| """ | |
| Verifies that the user has admin privileges. | |
| NOTE: As per user instructions, backend endpoints should rely on RLS | |
| policies when possible, but this dependency still serves as a high-level gate | |
| for admin-only API routes. | |
| """ | |
| if not current_user.is_admin: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="The user doesn't have enough privileges." | |
| ) | |
| return current_user | |