Spaces:
Paused
Paused
| from fastapi import APIRouter, HTTPException, status, Depends | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlalchemy import select | |
| from typing import List, Optional | |
| from ..db.database import get_db | |
| from ..db.models import User | |
| from ..db.schemas import UserCreate, UserInDB | |
| from ..core.dependencies import get_current_superuser, get_current_active_user | |
| from ..core.security import get_password_hash | |
| router = APIRouter() | |
| async def read_user_me(current_user: User = Depends(get_current_active_user)): | |
| return current_user | |
| async def list_users( | |
| skip: int = 0, | |
| limit: int = 10, | |
| current_user: User = Depends(get_current_superuser), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> List[UserInDB]: | |
| stmt = select(User).offset(skip).limit(limit) | |
| result = await db.execute(stmt) | |
| return result.scalars().all() | |
| async def create_user( | |
| user: UserCreate, | |
| current_user: User = Depends(get_current_superuser), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> UserInDB: | |
| # Check if email exists | |
| stmt = select(User).where(User.email == user.email) | |
| result = await db.execute(stmt) | |
| if result.scalar_one_or_none(): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Email already registered" | |
| ) | |
| # Create new user | |
| db_user = User( | |
| email=user.email, | |
| username=user.username, | |
| full_name=user.full_name, | |
| hashed_password=get_password_hash(user.password), | |
| is_active=user.is_active, | |
| is_superuser=user.is_superuser, | |
| roles=user.roles | |
| ) | |
| db.add(db_user) | |
| await db.commit() | |
| await db.refresh(db_user) | |
| return db_user | |
| async def update_user( | |
| user_id: int, | |
| user_update: UserCreate, | |
| current_user: User = Depends(get_current_superuser), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> UserInDB: | |
| stmt = select(User).where(User.id == user_id) | |
| result = await db.execute(stmt) | |
| db_user = result.scalar_one_or_none() | |
| if not db_user: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| # Update user fields | |
| update_data = user_update.dict(exclude_unset=True) | |
| if "password" in update_data: | |
| update_data["hashed_password"] = get_password_hash(update_data.pop("password")) | |
| for field, value in update_data.items(): | |
| setattr(db_user, field, value) | |
| await db.commit() | |
| await db.refresh(db_user) | |
| return db_user | |
| async def delete_user( | |
| user_id: int, | |
| current_user: User = Depends(get_current_superuser), | |
| db: AsyncSession = Depends(get_db) | |
| ): | |
| stmt = select(User).where(User.id == user_id) | |
| result = await db.execute(stmt) | |
| user = result.scalar_one_or_none() | |
| if not user: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| await db.delete(user) | |
| await db.commit() | |
| return {"status": "success", "message": "User deleted"} | |
| async def update_user_roles( | |
| user_id: int, | |
| roles: List[str], | |
| current_user: User = Depends(get_current_superuser), | |
| db: AsyncSession = Depends(get_db) | |
| ) -> UserInDB: | |
| valid_roles = ["user", "admin", "manager", "support"] | |
| invalid_roles = [role for role in roles if role not in valid_roles] | |
| if invalid_roles: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Invalid roles: {', '.join(invalid_roles)}" | |
| ) | |
| stmt = select(User).where(User.id == user_id) | |
| result = await db.execute(stmt) | |
| user = result.scalar_one_or_none() | |
| if not user: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| user.roles = roles | |
| await db.commit() | |
| await db.refresh(user) | |
| return user |