Spaces:
Runtime error
Runtime error
File size: 7,879 Bytes
3674b4b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 | from fastapi import APIRouter, HTTPException, status, Request
from backend.core.dependencies import DbSession, AdminUser
from backend.schemas.user import UserCreate, UserUpdate, UserResponse
from backend.services.auth import create_user, get_user_by_email
from backend.models import User, AuditLog
router = APIRouter()
@router.get("", response_model=list[UserResponse])
def list_users(db: DbSession, admin: AdminUser):
users = db.query(User).all()
return [
UserResponse(
id=u.id,
email=u.email,
full_name=u.full_name,
is_admin=u.is_admin,
is_active=u.is_active,
created_at=u.created_at,
last_login=u.last_login,
groups=[ug.group.name for ug in u.groups]
)
for u in users
]
@router.post("", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def create_new_user(user_data: UserCreate, db: DbSession, admin: AdminUser, request: Request):
existing = get_user_by_email(db, user_data.email)
if existing:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="User with this email already exists"
)
user = create_user(
db,
email=user_data.email,
password=user_data.password,
full_name=user_data.full_name,
is_admin=user_data.is_admin
)
# Audit log
audit = AuditLog(
user_id=admin.id,
action="user_created",
resource_type="user",
resource_id=user.id,
details={"email": user.email, "full_name": user.full_name, "is_admin": user.is_admin},
ip_address=request.client.host if request.client else None
)
db.add(audit)
db.commit()
return UserResponse(
id=user.id,
email=user.email,
full_name=user.full_name,
is_admin=user.is_admin,
is_active=user.is_active,
created_at=user.created_at,
groups=[]
)
@router.get("/{user_id}", response_model=UserResponse)
def get_user(user_id: str, db: DbSession, admin: AdminUser):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return UserResponse(
id=user.id,
email=user.email,
full_name=user.full_name,
is_admin=user.is_admin,
is_active=user.is_active,
created_at=user.created_at,
last_login=user.last_login,
groups=[ug.group.name for ug in user.groups]
)
@router.put("/{user_id}", response_model=UserResponse)
def update_user(user_id: str, user_data: UserUpdate, db: DbSession, admin: AdminUser, request: Request):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Track changes for audit logging
old_is_admin = user.is_admin
old_is_active = user.is_active
changes = {}
# Prevent deactivating or removing admin status from admin users
if user.is_admin:
if user_data.is_active is not None and not user_data.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot deactivate an admin account. Remove admin privileges first."
)
if user_data.is_admin is not None and not user_data.is_admin:
# Check if this is the last admin
admin_count = db.query(User).filter(User.is_admin == True, User.is_active == True).count()
if admin_count <= 1:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot remove admin privileges from the last admin account."
)
if user_data.email is not None and user_data.email != user.email:
changes["email"] = {"old": user.email, "new": user_data.email}
user.email = user_data.email
if user_data.full_name is not None and user_data.full_name != user.full_name:
changes["full_name"] = {"old": user.full_name, "new": user_data.full_name}
user.full_name = user_data.full_name
if user_data.is_admin is not None:
user.is_admin = user_data.is_admin
if user_data.is_active is not None:
user.is_active = user_data.is_active
if user_data.password is not None:
from backend.core.security import get_password_hash
user.password_hash = get_password_hash(user_data.password)
user.token_version += 1 # Invalidate all existing tokens
db.commit()
db.refresh(user)
ip_address = request.client.host if request.client else None
# Create specific audit logs for important changes
if user_data.password is not None:
audit = AuditLog(
user_id=admin.id,
action="password_changed",
resource_type="user",
resource_id=user.id,
details={"target_email": user.email, "changed_by": admin.email},
ip_address=ip_address
)
db.add(audit)
if user_data.is_admin is not None and old_is_admin != user.is_admin:
action = "admin_granted" if user.is_admin else "admin_revoked"
audit = AuditLog(
user_id=admin.id,
action=action,
resource_type="user",
resource_id=user.id,
details={"target_email": user.email},
ip_address=ip_address
)
db.add(audit)
if user_data.is_active is not None and old_is_active != user.is_active:
action = "user_activated" if user.is_active else "user_deactivated"
audit = AuditLog(
user_id=admin.id,
action=action,
resource_type="user",
resource_id=user.id,
details={"target_email": user.email},
ip_address=ip_address
)
db.add(audit)
# General update log
if changes:
audit = AuditLog(
user_id=admin.id,
action="user_updated",
resource_type="user",
resource_id=user.id,
details={"target_email": user.email, "changes": changes},
ip_address=ip_address
)
db.add(audit)
db.commit()
return UserResponse(
id=user.id,
email=user.email,
full_name=user.full_name,
is_admin=user.is_admin,
is_active=user.is_active,
created_at=user.created_at,
last_login=user.last_login,
groups=[ug.group.name for ug in user.groups]
)
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_user(user_id: str, db: DbSession, admin: AdminUser, request: Request):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Prevent deleting admin accounts
if user.is_admin:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot delete an admin account. Remove admin privileges first."
)
# Audit log before deletion
audit = AuditLog(
user_id=admin.id,
action="user_deleted",
resource_type="user",
resource_id=user.id,
details={"email": user.email, "full_name": user.full_name},
ip_address=request.client.host if request.client else None
)
db.add(audit)
db.delete(user)
db.commit()
|