MukeshKapoor25's picture
feat: Enhance user listing with full document retrieval and optional field projection
cd0e8f8
"""System User router for CRUD endpoints (authentication handled in auth service)."""
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, status
from app.core.logging import get_logger
from app.system_users.services.service import SystemUserService
from app.system_users.schemas.schema import (
CreateUserRequest,
UpdateUserRequest,
UserInfoResponse,
UserListResponse,
StandardResponse,
SystemUserListRequest,
SuspendUserRequest,
ResetPasswordAdminRequest,
)
from app.dependencies.auth import (
get_current_user,
get_system_user_service,
require_admin_role,
)
logger = get_logger(__name__)
router = APIRouter(
prefix="/system-users",
tags=["System Users"]
)
@router.post("/list")
async def list_users(
payload: SystemUserListRequest,
current_user = Depends(get_current_user),
user_service: SystemUserService = Depends(get_system_user_service)
):
"""
List users with pagination and optional field projection following API standards.
**Request Body:**
- `page`: Page number (default: 1)
- `page_size`: Page size (default: 20, max: 100)
- `status_filter`: Filter by user status (active, inactive, suspended, etc.)
- `projection_list`: Optional list of fields to include in response
"""
try:
page_size = min(payload.page_size, 100) # Limit maximum page size
# Always fetch full documents to enable merchant/security enrichment
users, total_count = await user_service.list_users(
payload.page,
page_size,
payload.status_filter,
None, # Don't use projection in service layer
current_user
)
# Convert users to response with merchant and security details
user_responses = []
for user in users:
# Get the raw user document for the enhanced conversion
user_doc = await user_service.collection.find_one({"user_id": user.user_id})
if user_doc:
user_response = await user_service.convert_to_user_info_response_with_merchant_and_security(user_doc)
# If projection is requested, filter the response to only include requested fields
if payload.projection_list:
# Convert response to dict for filtering
response_dict = user_response.model_dump(exclude_none=False)
filtered_dict = {}
for field in payload.projection_list:
if "." in field:
# Handle nested fields like "security_settings.failed_login_attempts"
parts = field.split(".")
value = response_dict
for part in parts:
if isinstance(value, dict):
value = value.get(part)
elif value is not None:
# If value is an object, try to get the attribute
try:
value = getattr(value, part, None)
except (AttributeError, TypeError):
value = None
break
else:
value = None
break
# Store nested field with dot notation as key
filtered_dict[field] = value
else:
# Handle simple fields
value = response_dict.get(field)
filtered_dict[field] = value
user_responses.append(filtered_dict)
else:
user_responses.append(user_response)
return {
"users": user_responses,
"total_count": total_count,
"page": payload.page,
"page_size": page_size
}
except Exception as e:
logger.error(f"Error listing users: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to retrieve users"
)
# Specific routes must come before generic /{user_id} routes
@router.put("/{user_id}/suspend", response_model=StandardResponse)
async def suspend_user(
user_id: str,
suspend_data: SuspendUserRequest,
current_user = Depends(require_admin_role),
user_service: SystemUserService = Depends(get_system_user_service)
):
"""
Suspend user account with reason. Requires admin privileges.
**Request Body:**
- `reason`: Reason for suspension (required)
"""
try:
# Prevent self-suspension
if user_id == current_user.user_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot suspend your own account"
)
success = await user_service.suspend_user(
user_id,
suspend_data.reason,
current_user.user_id
)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return StandardResponse(
success=True,
message="User suspended successfully"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error suspending user {user_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to suspend user"
)
@router.put("/{user_id}/reset-password", response_model=StandardResponse)
async def reset_password_admin(
user_id: str,
reset_data: ResetPasswordAdminRequest,
current_user = Depends(require_admin_role),
user_service: SystemUserService = Depends(get_system_user_service)
):
"""
Admin reset user password. Generates temporary password and optionally sends email.
Requires admin privileges.
**Request Body:**
- `send_email`: Send password reset email to user (default: true)
"""
try:
temp_password = await user_service.reset_password_admin(
user_id,
reset_data.send_email,
current_user.user_id
)
if not temp_password:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return StandardResponse(
success=True,
message="Password reset successfully",
data={"temporary_password": temp_password} if not reset_data.send_email else None
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error resetting password for user {user_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to reset password"
)
@router.put("/{user_id}/unlock", response_model=StandardResponse)
async def unlock_user(
user_id: str,
current_user = Depends(require_admin_role),
user_service: SystemUserService = Depends(get_system_user_service)
):
"""
Unlock user account by clearing account lock and failed login attempts.
Requires admin privileges.
"""
try:
success = await user_service.unlock_user(user_id, current_user.user_id)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return StandardResponse(
success=True,
message="User unlocked successfully"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error unlocking user {user_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to unlock user"
)
# Generic routes come after specific routes
@router.get("/{user_id}", response_model=UserInfoResponse)
async def get_user_by_id(
user_id: str,
current_user = Depends(require_admin_role),
user_service: SystemUserService = Depends(get_system_user_service)
):
"""
Get user by ID with merchant details and security settings. Requires admin privileges.
"""
user_doc = await user_service.collection.find_one({"user_id": user_id})
if not user_doc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return await user_service.convert_to_user_info_response_with_merchant_and_security(user_doc)
@router.put("/{user_id}", response_model=UserInfoResponse)
async def update_user(
user_id: str,
update_data: UpdateUserRequest,
current_user = Depends(require_admin_role),
user_service: SystemUserService = Depends(get_system_user_service)
):
"""
Update user information with merchant details and security settings. Requires admin privileges.
"""
try:
updated_user = await user_service.update_user(user_id, update_data, current_user.user_id)
if not updated_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Fetch the updated document to get merchant and security info
user_doc = await user_service.collection.find_one({"user_id": user_id})
if user_doc:
return await user_service.convert_to_user_info_response_with_merchant_and_security(user_doc)
# Fallback to basic conversion if document not found
return user_service.convert_to_user_info_response(updated_user)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating user {user_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to update user"
)
@router.delete("/{user_id}", response_model=StandardResponse)
async def deactivate_user(
user_id: str,
current_user = Depends(require_admin_role),
user_service: SystemUserService = Depends(get_system_user_service)
):
"""
Deactivate user account. Requires admin privileges.
"""
try:
# Prevent self-deactivation
if user_id == current_user.user_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot deactivate your own account"
)
success = await user_service.deactivate_user(user_id, current_user.user_id)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return StandardResponse(
success=True,
message="User deactivated successfully"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deactivating user {user_id}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to deactivate user"
)