ahlya / app /api /v1 /auth.py
Ba7ath-Project's picture
Fix: Fully implement and enforce is_active status globally
86a4017
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from typing import Optional, List
from app.core.supabase_client import get_user_client, get_admin_client
from app.services.auth_service import get_current_active_user, get_current_admin_user, AuthenticatedUserInfo
router = APIRouter()
class UserCreate(BaseModel):
email: str
password: str
full_name: str
is_admin: bool = False
is_active: bool = True
class UserUpdate(BaseModel):
email: Optional[str] = None
password: Optional[str] = None
full_name: Optional[str] = None
is_admin: Optional[bool] = None
is_active: Optional[bool] = None
@router.get("/me")
async def read_users_me(
current_user: AuthenticatedUserInfo = Depends(get_current_active_user)
):
"""
Example endpoint showing how to query information on behalf of the user,
forcing Supabase to apply Row Level Security (RLS) via their JWT.
"""
try:
# Initialize client with the user's JWT
client = get_user_client(current_user.jwt)
# This will securely return only the row matching `auth.uid() = auth_user_id`
response = (
client.table("users")
.select("*")
.eq("auth_user_id", current_user.user_id)
.single()
.execute()
)
return response.data
except Exception as e:
# Note: If RLS prevents reading, Supabase might return a PostgREST error.
raise HTTPException(status_code=400, detail=str(e))
@router.get("/users")
async def list_users(current_admin: AuthenticatedUserInfo = Depends(get_current_admin_user)):
try:
admin_client = get_admin_client()
# On tente de lister les utilisateurs via l'API Admin de Supabase
response = admin_client.auth.admin.list_users()
users = []
# Supabase-py retourne généralement une liste d'objets User
# On vérifie si c'est une liste ou un objet contenant une liste
user_list = response if isinstance(response, list) else getattr(response, 'users', [])
for u in user_list:
metadata = getattr(u, "user_metadata", {}) or {}
app_metadata = getattr(u, "app_metadata", {}) or {}
is_active = metadata.get("is_active")
if is_active is None:
is_active = app_metadata.get("is_active", True)
users.append({
"id": getattr(u, "id", "inconnu"),
"email": getattr(u, "email", "inconnu"),
"full_name": metadata.get("full_name", ""),
"is_admin": metadata.get("is_admin", False) is True or app_metadata.get("is_admin", False) is True,
"is_active": is_active,
"created_at": str(getattr(u, "created_at", ""))
})
return users
except Exception as e:
print(f"ERROR: Echec list_users : {str(e)}")
raise HTTPException(
status_code=400,
detail=f"Erreur Supabase Admin: {str(e)}"
)
@router.post("/users")
async def create_user(
user: UserCreate,
current_admin: AuthenticatedUserInfo = Depends(get_current_admin_user)
):
try:
admin_client = get_admin_client()
response = admin_client.auth.admin.create_user({
"email": user.email,
"password": user.password,
"email_confirm": True,
"user_metadata": {
"full_name": user.full_name,
"is_admin": user.is_admin,
"is_active": user.is_active
}
})
return {"id": response.user.id, "email": response.user.email}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@router.patch("/users/{user_id}")
async def update_user(
user_id: str,
user_update: UserUpdate,
current_admin: AuthenticatedUserInfo = Depends(get_current_admin_user)
):
try:
admin_client = get_admin_client()
attributes = {}
if user_update.email is not None:
attributes["email"] = user_update.email
if user_update.password:
attributes["password"] = user_update.password
user_metadata = {}
if user_update.full_name is not None:
user_metadata["full_name"] = user_update.full_name
if user_update.is_admin is not None:
user_metadata["is_admin"] = user_update.is_admin
if user_update.is_active is not None:
user_metadata["is_active"] = user_update.is_active
if user_metadata:
attributes["user_metadata"] = user_metadata
response = admin_client.auth.admin.update_user_by_id(user_id, attributes)
# Update public.users database dynamically
public_updates = {}
if user_update.is_admin is not None:
public_updates["is_admin"] = user_update.is_admin
if user_update.is_active is not None:
public_updates["is_active"] = user_update.is_active
if public_updates:
try:
admin_client.table("users").update(public_updates).eq("auth_user_id", user_id).execute()
except Exception as e:
print(f"WARNING: Could not update public.users from auth API: {e}")
return {"id": response.user.id}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@router.delete("/users/{user_id}")
async def delete_user(
user_id: str,
current_admin: AuthenticatedUserInfo = Depends(get_current_admin_user)
):
try:
admin_client = get_admin_client()
admin_client.auth.admin.delete_user(user_id)
return {"status": "success"}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))