ahlya / app /services /auth_service.py
Ba7ath-Project's picture
feat: sync backend app and services from local dev
4f081c0
import os
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
from typing import Optional
from jose import jwt as jose_jwt, JWTError
import jwt as pyjwt
from jwt.exceptions import InvalidTokenError as PyJWTInvalidTokenError
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization
import base64
import json
import urllib.request
# HTTPBearer automatically extracts the Bearer token from the Authorization header
security = HTTPBearer()
SUPABASE_JWT_SECRET = os.getenv("SUPABASE_JWT_SECRET")
SUPABASE_URL = os.getenv("SUPABASE_URL", "")
if not SUPABASE_JWT_SECRET:
raise RuntimeError("ERREUR FATALE: SUPABASE_JWT_SECRET est absent. Le décodage de JWT sans vérification de signature est strictement interdit (Standards Moez Elbey).")
_JWKS_CACHE = {}
def _jwks_to_pem(jwk: dict) -> str:
"""Convertit une clé JWKS ES256 en clé PEM."""
if jwk.get("kty") != "EC":
raise ValueError("Support ES256 (EC) uniquement")
x_b64 = jwk.get("x", "")
y_b64 = jwk.get("y", "")
x = base64.urlsafe_b64decode(x_b64 + "==")
y = base64.urlsafe_b64decode(y_b64 + "==")
public_numbers = ec.EllipticCurvePublicNumbers(
x=int.from_bytes(x, byteorder="big"),
y=int.from_bytes(y, byteorder="big"),
curve=ec.SECP256R1()
)
public_key = public_numbers.public_key()
pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
return pem.decode("utf-8")
def _get_public_keys() -> dict:
"""Récupère et cache les clés publiques Supabase."""
global _JWKS_CACHE
if _JWKS_CACHE:
return _JWKS_CACHE
try:
url = f"{SUPABASE_URL}/auth/v1/.well-known/jwks.json"
anon_key = os.getenv("SUPABASE_SERVICE_ROLE_KEY") or os.getenv("SUPABASE_ANON_KEY", "")
req = urllib.request.Request(url)
if anon_key:
req.add_header("apikey", anon_key)
with urllib.request.urlopen(req, timeout=5) as response:
data = json.loads(response.read())
_JWKS_CACHE = data
return _JWKS_CACHE
except Exception as e:
print(f"WARNING: Impossible de récupérer les JWKS : {e}")
return {}
class AuthenticatedUserInfo(BaseModel):
user_id: str
email: Optional[str] = None
jwt: str
is_admin: bool = False
is_active: bool = True
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> AuthenticatedUserInfo:
"""
Validates the Supabase JWT sent by the Vercel frontend and extracts user details.
The valid JWT is attached to the returned object, which can then be passed to the
Supabase client to execute queries as that user (enforcing RLS).
Supports both HS256 (symmetric) and ES256 (asymmetric via JWKS).
"""
token = credentials.credentials
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# 1. Determine algorithm from header
header = jose_jwt.get_unverified_header(token)
alg = header.get("alg")
kid = header.get("kid")
# 2. Validate based on algorithm
if alg == "HS256":
# HS256: Use HS256 secret (symmetric)
payload = jose_jwt.decode(
token,
SUPABASE_JWT_SECRET,
algorithms=["HS256"],
options={"verify_aud": False}
)
elif alg == "ES256":
# ES256: Fetch JWKS and convert to PEM (asymmetric)
jwks = _get_public_keys()
keys = jwks.get("keys", [])
jwk_key = None
for key in keys:
if key.get("kid") == kid:
jwk_key = key
break
if not jwk_key:
for key in keys:
if key.get("kty") == "EC":
jwk_key = key
break
if not jwk_key:
raise JWTError(f"No ES256 key found for kid {kid}")
# Convert JWKS to PEM for PyJWT
verification_key = _jwks_to_pem(jwk_key)
payload = pyjwt.decode(
token,
verification_key,
algorithms=["ES256"],
options={"verify_aud": False, "verify_exp": False}
)
else:
raise JWTError(f"Unsupported algorithm: {alg}")
user_id = payload.get("sub")
if not user_id:
raise credentials_exception
# In Supabase, custom claims or roles are often stored in app_metadata/user_metadata
app_metadata = payload.get("app_metadata", {})
user_metadata = payload.get("user_metadata", {})
# Default checking
is_admin = app_metadata.get("is_admin", False) or user_metadata.get("is_admin", False)
is_active_app = app_metadata.get("is_active")
is_active_user = user_metadata.get("is_active")
is_active = False if (is_active_app is False or is_active_user is False) else True
# FALLBACK: Check public.users if token is missing admin status or to strictly verify active status
try:
from app.core.supabase_client import get_admin_client
admin_client = get_admin_client()
db_res = admin_client.table("users").select("is_admin, is_active").eq("auth_user_id", user_id).execute()
if db_res.data:
db_user = db_res.data[0]
if not is_admin and db_user.get("is_admin"):
is_admin = True
print(f"DEBUG AUTH: Fallback DB is_admin réussi pour {payload.get('email')}")
if db_user.get("is_active") is False:
is_active = False
print(f"DEBUG AUTH: Fallback DB détecte is_active=False pour {payload.get('email')}")
except Exception as e:
print(f"WARNING: Echec du fallback DB auth : {e}")
print(f"DEBUG AUTH: User {payload.get('email')} - is_admin final: {is_admin}, is_active final: {is_active}")
return AuthenticatedUserInfo(
user_id=user_id,
email=payload.get("email"),
jwt=token,
is_admin=is_admin,
is_active=is_active
)
except (JWTError, PyJWTInvalidTokenError) as e:
print(f"JWT Validation error: {e}")
raise credentials_exception
except Exception as e:
print(f"Unknown Auth error: {e}")
raise credentials_exception
async def get_current_active_user(current_user: AuthenticatedUserInfo = Depends(get_current_user)):
"""
Enforces active user requirement.
"""
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Account is disabled."
)
return current_user
async def get_current_admin_user(current_user: AuthenticatedUserInfo = Depends(get_current_active_user)):
"""
Verifies that the user has admin privileges.
NOTE: As per user instructions, backend endpoints should rely on RLS
policies when possible, but this dependency still serves as a high-level gate
for admin-only API routes.
"""
if not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="The user doesn't have enough privileges."
)
return current_user