from fastapi import APIRouter, HTTPException, status, Depends from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from typing import List, Optional from ..db.database import get_db from ..db.models import User from ..db.schemas import UserCreate, UserInDB from ..core.dependencies import get_current_superuser, get_current_active_user from ..core.security import get_password_hash router = APIRouter() @router.get("/me", response_model=UserInDB) async def read_user_me(current_user: User = Depends(get_current_active_user)): return current_user @router.get("/", response_model=List[UserInDB]) async def list_users( skip: int = 0, limit: int = 10, current_user: User = Depends(get_current_superuser), db: AsyncSession = Depends(get_db) ) -> List[UserInDB]: stmt = select(User).offset(skip).limit(limit) result = await db.execute(stmt) return result.scalars().all() @router.post("/", response_model=UserInDB) async def create_user( user: UserCreate, current_user: User = Depends(get_current_superuser), db: AsyncSession = Depends(get_db) ) -> UserInDB: # Check if email exists stmt = select(User).where(User.email == user.email) result = await db.execute(stmt) if result.scalar_one_or_none(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered" ) # Create new user db_user = User( email=user.email, username=user.username, full_name=user.full_name, hashed_password=get_password_hash(user.password), is_active=user.is_active, is_superuser=user.is_superuser, roles=user.roles ) db.add(db_user) await db.commit() await db.refresh(db_user) return db_user @router.put("/{user_id}", response_model=UserInDB) async def update_user( user_id: int, user_update: UserCreate, current_user: User = Depends(get_current_superuser), db: AsyncSession = Depends(get_db) ) -> UserInDB: stmt = select(User).where(User.id == user_id) result = await db.execute(stmt) db_user = result.scalar_one_or_none() if not db_user: raise HTTPException(status_code=404, detail="User not found") # Update user fields update_data = user_update.dict(exclude_unset=True) if "password" in update_data: update_data["hashed_password"] = get_password_hash(update_data.pop("password")) for field, value in update_data.items(): setattr(db_user, field, value) await db.commit() await db.refresh(db_user) return db_user @router.delete("/{user_id}") async def delete_user( user_id: int, current_user: User = Depends(get_current_superuser), db: AsyncSession = Depends(get_db) ): stmt = select(User).where(User.id == user_id) result = await db.execute(stmt) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=404, detail="User not found") await db.delete(user) await db.commit() return {"status": "success", "message": "User deleted"} @router.put("/{user_id}/roles", response_model=UserInDB) async def update_user_roles( user_id: int, roles: List[str], current_user: User = Depends(get_current_superuser), db: AsyncSession = Depends(get_db) ) -> UserInDB: valid_roles = ["user", "admin", "manager", "support"] invalid_roles = [role for role in roles if role not in valid_roles] if invalid_roles: raise HTTPException( status_code=400, detail=f"Invalid roles: {', '.join(invalid_roles)}" ) stmt = select(User).where(User.id == user_id) result = await db.execute(stmt) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=404, detail="User not found") user.roles = roles await db.commit() await db.refresh(user) return user