File size: 4,107 Bytes
3e5c7dd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a3b84cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
from fastapi import APIRouter, HTTPException, status, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from typing import List, Optional
from ..db.database import get_db
from ..db.models import User
from ..db.schemas import UserCreate, UserInDB
from ..core.dependencies import get_current_superuser, get_current_active_user
from ..core.security import get_password_hash

router = APIRouter()

@router.get("/me", response_model=UserInDB)
async def read_user_me(current_user: User = Depends(get_current_active_user)):
    return current_user

@router.get("/", response_model=List[UserInDB])
async def list_users(

    skip: int = 0,

    limit: int = 10,

    current_user: User = Depends(get_current_superuser),

    db: AsyncSession = Depends(get_db)

) -> List[UserInDB]:
    stmt = select(User).offset(skip).limit(limit)
    result = await db.execute(stmt)
    return result.scalars().all()

@router.post("/", response_model=UserInDB)
async def create_user(

    user: UserCreate,

    current_user: User = Depends(get_current_superuser),

    db: AsyncSession = Depends(get_db)

) -> UserInDB:
    # Check if email exists
    stmt = select(User).where(User.email == user.email)
    result = await db.execute(stmt)
    if result.scalar_one_or_none():
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Email already registered"
        )
    
    # Create new user
    db_user = User(
        email=user.email,
        username=user.username,
        full_name=user.full_name,
        hashed_password=get_password_hash(user.password),
        is_active=user.is_active,
        is_superuser=user.is_superuser,
        roles=user.roles
    )
    
    db.add(db_user)
    await db.commit()
    await db.refresh(db_user)
    return db_user

@router.put("/{user_id}", response_model=UserInDB)
async def update_user(

    user_id: int,

    user_update: UserCreate,

    current_user: User = Depends(get_current_superuser),

    db: AsyncSession = Depends(get_db)

) -> UserInDB:
    stmt = select(User).where(User.id == user_id)
    result = await db.execute(stmt)
    db_user = result.scalar_one_or_none()
    
    if not db_user:
        raise HTTPException(status_code=404, detail="User not found")
    
    # Update user fields
    update_data = user_update.dict(exclude_unset=True)
    if "password" in update_data:
        update_data["hashed_password"] = get_password_hash(update_data.pop("password"))
    
    for field, value in update_data.items():
        setattr(db_user, field, value)
    
    await db.commit()
    await db.refresh(db_user)
    return db_user

@router.delete("/{user_id}")
async def delete_user(

    user_id: int,

    current_user: User = Depends(get_current_superuser),

    db: AsyncSession = Depends(get_db)

):
    stmt = select(User).where(User.id == user_id)
    result = await db.execute(stmt)
    user = result.scalar_one_or_none()
    
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    
    await db.delete(user)
    await db.commit()
    return {"status": "success", "message": "User deleted"}

@router.put("/{user_id}/roles", response_model=UserInDB)
async def update_user_roles(

    user_id: int,

    roles: List[str],

    current_user: User = Depends(get_current_superuser),

    db: AsyncSession = Depends(get_db)

) -> UserInDB:
    valid_roles = ["user", "admin", "manager", "support"]
    invalid_roles = [role for role in roles if role not in valid_roles]
    if invalid_roles:
        raise HTTPException(
            status_code=400,
            detail=f"Invalid roles: {', '.join(invalid_roles)}"
        )
    
    stmt = select(User).where(User.id == user_id)
    result = await db.execute(stmt)
    user = result.scalar_one_or_none()
    
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    
    user.roles = roles
    await db.commit()
    await db.refresh(user)
    return user