Spaces:
Running
Running
| from fastapi import APIRouter, Depends, HTTPException | |
| from pydantic import BaseModel | |
| from typing import Optional, List | |
| from app.core.supabase_client import get_user_client, get_admin_client | |
| from app.services.auth_service import get_current_active_user, get_current_admin_user, AuthenticatedUserInfo | |
| router = APIRouter() | |
| class UserCreate(BaseModel): | |
| email: str | |
| password: str | |
| full_name: str | |
| is_admin: bool = False | |
| is_active: bool = True | |
| class UserUpdate(BaseModel): | |
| email: Optional[str] = None | |
| password: Optional[str] = None | |
| full_name: Optional[str] = None | |
| is_admin: Optional[bool] = None | |
| is_active: Optional[bool] = None | |
| async def read_users_me( | |
| current_user: AuthenticatedUserInfo = Depends(get_current_active_user) | |
| ): | |
| """ | |
| Example endpoint showing how to query information on behalf of the user, | |
| forcing Supabase to apply Row Level Security (RLS) via their JWT. | |
| """ | |
| try: | |
| # Initialize client with the user's JWT | |
| client = get_user_client(current_user.jwt) | |
| # This will securely return only the row matching `auth.uid() = auth_user_id` | |
| response = ( | |
| client.table("users") | |
| .select("*") | |
| .eq("auth_user_id", current_user.user_id) | |
| .single() | |
| .execute() | |
| ) | |
| return response.data | |
| except Exception as e: | |
| # Note: If RLS prevents reading, Supabase might return a PostgREST error. | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| async def list_users(current_admin: AuthenticatedUserInfo = Depends(get_current_admin_user)): | |
| try: | |
| admin_client = get_admin_client() | |
| # On tente de lister les utilisateurs via l'API Admin de Supabase | |
| response = admin_client.auth.admin.list_users() | |
| users = [] | |
| # Supabase-py retourne généralement une liste d'objets User | |
| # On vérifie si c'est une liste ou un objet contenant une liste | |
| user_list = response if isinstance(response, list) else getattr(response, 'users', []) | |
| for u in user_list: | |
| metadata = getattr(u, "user_metadata", {}) or {} | |
| app_metadata = getattr(u, "app_metadata", {}) or {} | |
| is_active = metadata.get("is_active") | |
| if is_active is None: | |
| is_active = app_metadata.get("is_active", True) | |
| users.append({ | |
| "id": getattr(u, "id", "inconnu"), | |
| "email": getattr(u, "email", "inconnu"), | |
| "full_name": metadata.get("full_name", ""), | |
| "is_admin": metadata.get("is_admin", False) is True or app_metadata.get("is_admin", False) is True, | |
| "is_active": is_active, | |
| "created_at": str(getattr(u, "created_at", "")) | |
| }) | |
| return users | |
| except Exception as e: | |
| print(f"ERROR: Echec list_users : {str(e)}") | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Erreur Supabase Admin: {str(e)}" | |
| ) | |
| async def create_user( | |
| user: UserCreate, | |
| current_admin: AuthenticatedUserInfo = Depends(get_current_admin_user) | |
| ): | |
| try: | |
| admin_client = get_admin_client() | |
| response = admin_client.auth.admin.create_user({ | |
| "email": user.email, | |
| "password": user.password, | |
| "email_confirm": True, | |
| "user_metadata": { | |
| "full_name": user.full_name, | |
| "is_admin": user.is_admin, | |
| "is_active": user.is_active | |
| } | |
| }) | |
| return {"id": response.user.id, "email": response.user.email} | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| async def update_user( | |
| user_id: str, | |
| user_update: UserUpdate, | |
| current_admin: AuthenticatedUserInfo = Depends(get_current_admin_user) | |
| ): | |
| try: | |
| admin_client = get_admin_client() | |
| attributes = {} | |
| if user_update.email is not None: | |
| attributes["email"] = user_update.email | |
| if user_update.password: | |
| attributes["password"] = user_update.password | |
| user_metadata = {} | |
| if user_update.full_name is not None: | |
| user_metadata["full_name"] = user_update.full_name | |
| if user_update.is_admin is not None: | |
| user_metadata["is_admin"] = user_update.is_admin | |
| if user_update.is_active is not None: | |
| user_metadata["is_active"] = user_update.is_active | |
| if user_metadata: | |
| attributes["user_metadata"] = user_metadata | |
| response = admin_client.auth.admin.update_user_by_id(user_id, attributes) | |
| # Update public.users database dynamically | |
| public_updates = {} | |
| if user_update.is_admin is not None: | |
| public_updates["is_admin"] = user_update.is_admin | |
| if user_update.is_active is not None: | |
| public_updates["is_active"] = user_update.is_active | |
| if public_updates: | |
| try: | |
| admin_client.table("users").update(public_updates).eq("auth_user_id", user_id).execute() | |
| except Exception as e: | |
| print(f"WARNING: Could not update public.users from auth API: {e}") | |
| return {"id": response.user.id} | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| async def delete_user( | |
| user_id: str, | |
| current_admin: AuthenticatedUserInfo = Depends(get_current_admin_user) | |
| ): | |
| try: | |
| admin_client = get_admin_client() | |
| admin_client.auth.admin.delete_user(user_id) | |
| return {"status": "success"} | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |