richtext's picture
Upload folder using huggingface_hub
3674b4b verified
from fastapi import APIRouter, HTTPException, status, Request
from backend.core.dependencies import DbSession, AdminUser
from backend.schemas.user import UserCreate, UserUpdate, UserResponse
from backend.services.auth import create_user, get_user_by_email
from backend.models import User, AuditLog
router = APIRouter()
@router.get("", response_model=list[UserResponse])
def list_users(db: DbSession, admin: AdminUser):
users = db.query(User).all()
return [
UserResponse(
id=u.id,
email=u.email,
full_name=u.full_name,
is_admin=u.is_admin,
is_active=u.is_active,
created_at=u.created_at,
last_login=u.last_login,
groups=[ug.group.name for ug in u.groups]
)
for u in users
]
@router.post("", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def create_new_user(user_data: UserCreate, db: DbSession, admin: AdminUser, request: Request):
existing = get_user_by_email(db, user_data.email)
if existing:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="User with this email already exists"
)
user = create_user(
db,
email=user_data.email,
password=user_data.password,
full_name=user_data.full_name,
is_admin=user_data.is_admin
)
# Audit log
audit = AuditLog(
user_id=admin.id,
action="user_created",
resource_type="user",
resource_id=user.id,
details={"email": user.email, "full_name": user.full_name, "is_admin": user.is_admin},
ip_address=request.client.host if request.client else None
)
db.add(audit)
db.commit()
return UserResponse(
id=user.id,
email=user.email,
full_name=user.full_name,
is_admin=user.is_admin,
is_active=user.is_active,
created_at=user.created_at,
groups=[]
)
@router.get("/{user_id}", response_model=UserResponse)
def get_user(user_id: str, db: DbSession, admin: AdminUser):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return UserResponse(
id=user.id,
email=user.email,
full_name=user.full_name,
is_admin=user.is_admin,
is_active=user.is_active,
created_at=user.created_at,
last_login=user.last_login,
groups=[ug.group.name for ug in user.groups]
)
@router.put("/{user_id}", response_model=UserResponse)
def update_user(user_id: str, user_data: UserUpdate, db: DbSession, admin: AdminUser, request: Request):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Track changes for audit logging
old_is_admin = user.is_admin
old_is_active = user.is_active
changes = {}
# Prevent deactivating or removing admin status from admin users
if user.is_admin:
if user_data.is_active is not None and not user_data.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot deactivate an admin account. Remove admin privileges first."
)
if user_data.is_admin is not None and not user_data.is_admin:
# Check if this is the last admin
admin_count = db.query(User).filter(User.is_admin == True, User.is_active == True).count()
if admin_count <= 1:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot remove admin privileges from the last admin account."
)
if user_data.email is not None and user_data.email != user.email:
changes["email"] = {"old": user.email, "new": user_data.email}
user.email = user_data.email
if user_data.full_name is not None and user_data.full_name != user.full_name:
changes["full_name"] = {"old": user.full_name, "new": user_data.full_name}
user.full_name = user_data.full_name
if user_data.is_admin is not None:
user.is_admin = user_data.is_admin
if user_data.is_active is not None:
user.is_active = user_data.is_active
if user_data.password is not None:
from backend.core.security import get_password_hash
user.password_hash = get_password_hash(user_data.password)
user.token_version += 1 # Invalidate all existing tokens
db.commit()
db.refresh(user)
ip_address = request.client.host if request.client else None
# Create specific audit logs for important changes
if user_data.password is not None:
audit = AuditLog(
user_id=admin.id,
action="password_changed",
resource_type="user",
resource_id=user.id,
details={"target_email": user.email, "changed_by": admin.email},
ip_address=ip_address
)
db.add(audit)
if user_data.is_admin is not None and old_is_admin != user.is_admin:
action = "admin_granted" if user.is_admin else "admin_revoked"
audit = AuditLog(
user_id=admin.id,
action=action,
resource_type="user",
resource_id=user.id,
details={"target_email": user.email},
ip_address=ip_address
)
db.add(audit)
if user_data.is_active is not None and old_is_active != user.is_active:
action = "user_activated" if user.is_active else "user_deactivated"
audit = AuditLog(
user_id=admin.id,
action=action,
resource_type="user",
resource_id=user.id,
details={"target_email": user.email},
ip_address=ip_address
)
db.add(audit)
# General update log
if changes:
audit = AuditLog(
user_id=admin.id,
action="user_updated",
resource_type="user",
resource_id=user.id,
details={"target_email": user.email, "changes": changes},
ip_address=ip_address
)
db.add(audit)
db.commit()
return UserResponse(
id=user.id,
email=user.email,
full_name=user.full_name,
is_admin=user.is_admin,
is_active=user.is_active,
created_at=user.created_at,
last_login=user.last_login,
groups=[ug.group.name for ug in user.groups]
)
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_user(user_id: str, db: DbSession, admin: AdminUser, request: Request):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Prevent deleting admin accounts
if user.is_admin:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot delete an admin account. Remove admin privileges first."
)
# Audit log before deletion
audit = AuditLog(
user_id=admin.id,
action="user_deleted",
resource_type="user",
resource_id=user.id,
details={"email": user.email, "full_name": user.full_name},
ip_address=request.client.host if request.client else None
)
db.add(audit)
db.delete(user)
db.commit()