Spaces:
Running
Running
| from fastapi import APIRouter, Depends, HTTPException, status | |
| from sqlalchemy.orm import Session | |
| from typing import List | |
| from src.infrastructure.database import get_db | |
| from src.core.domain.schemas import UserCreate, UserResponse, UserUpdate, PasswordChange, AdminUserUpdate | |
| from src.core.domain.db_models import User, UserRole | |
| from src.core.security import ( | |
| get_password_hash, verify_password, | |
| get_current_user, require_super_admin | |
| ) | |
| router = APIRouter() | |
| # ββ Public ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def register(user: UserCreate, db: Session = Depends(get_db)): | |
| """Register a new user account (role defaults to 'user').""" | |
| existing = db.query(User).filter( | |
| (User.username == user.username) | (User.email == user.email) | |
| ).first() | |
| if existing: | |
| raise HTTPException(status_code=400, detail="Username or email already registered") | |
| new_user = User( | |
| username=user.username, | |
| email=user.email, | |
| full_name=user.full_name, | |
| hashed_password=get_password_hash(user.password), | |
| role=UserRole.user, | |
| ) | |
| db.add(new_user) | |
| db.commit() | |
| db.refresh(new_user) | |
| return new_user | |
| # ββ Authenticated user ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def update_profile( | |
| body: UserUpdate, | |
| current_user: User = Depends(get_current_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """Update own profile (username, full_name, email).""" | |
| if body.username and body.username != current_user.username: | |
| if db.query(User).filter(User.username == body.username).first(): | |
| raise HTTPException(status_code=400, detail="Username already taken") | |
| current_user.username = body.username | |
| if body.email and body.email != current_user.email: | |
| if db.query(User).filter(User.email == body.email).first(): | |
| raise HTTPException(status_code=400, detail="Email already in use") | |
| current_user.email = body.email | |
| if body.full_name is not None: | |
| current_user.full_name = body.full_name | |
| db.commit() | |
| db.refresh(current_user) | |
| return current_user | |
| def change_password( | |
| body: PasswordChange, | |
| current_user: User = Depends(get_current_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """Change own password.""" | |
| if not verify_password(body.current_password, current_user.hashed_password): | |
| raise HTTPException(status_code=400, detail="Current password is incorrect") | |
| current_user.hashed_password = get_password_hash(body.new_password) | |
| db.commit() | |
| # ββ Super admin only ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def list_users( | |
| skip: int = 0, | |
| limit: int = 50, | |
| _admin: User = Depends(require_super_admin), | |
| db: Session = Depends(get_db) | |
| ): | |
| """List all users (super_admin only).""" | |
| return db.query(User).offset(skip).limit(limit).all() | |
| def get_user( | |
| user_id: int, | |
| _admin: User = Depends(require_super_admin), | |
| db: Session = Depends(get_db) | |
| ): | |
| """Get a specific user by ID (super_admin only).""" | |
| user = db.query(User).filter(User.id == user_id).first() | |
| if not user: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| return user | |
| def admin_update_user( | |
| user_id: int, | |
| body: AdminUserUpdate, | |
| _admin: User = Depends(require_super_admin), | |
| db: Session = Depends(get_db) | |
| ): | |
| """Update a user's role or active status (super_admin only).""" | |
| user = db.query(User).filter(User.id == user_id).first() | |
| if not user: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| if body.is_active is not None: | |
| user.is_active = body.is_active | |
| if body.role is not None: | |
| try: | |
| user.role = UserRole(body.role) | |
| except ValueError: | |
| raise HTTPException(status_code=400, detail="Invalid role. Must be 'super_admin' or 'user'") | |
| db.commit() | |
| db.refresh(user) | |
| return user | |
| def delete_user( | |
| user_id: int, | |
| admin: User = Depends(require_super_admin), | |
| db: Session = Depends(get_db) | |
| ): | |
| """Delete a user (super_admin only). Cannot delete yourself.""" | |
| if user_id == admin.id: | |
| raise HTTPException(status_code=400, detail="Cannot delete your own account") | |
| user = db.query(User).filter(User.id == user_id).first() | |
| if not user: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| db.delete(user) | |
| db.commit() | |