Spaces:
Running
Running
| """System User router for CRUD endpoints (authentication handled in auth service).""" | |
| from typing import List, Optional | |
| from fastapi import APIRouter, Depends, HTTPException, status | |
| from app.core.logging import get_logger | |
| from app.system_users.services.service import SystemUserService | |
| from app.system_users.schemas.schema import ( | |
| CreateUserRequest, | |
| UpdateUserRequest, | |
| UserInfoResponse, | |
| UserListResponse, | |
| StandardResponse, | |
| SystemUserListRequest, | |
| SuspendUserRequest, | |
| ResetPasswordAdminRequest, | |
| ) | |
| from app.dependencies.auth import ( | |
| get_current_user, | |
| get_system_user_service, | |
| require_admin_role, | |
| ) | |
| logger = get_logger(__name__) | |
| router = APIRouter( | |
| prefix="/system-users", | |
| tags=["System Users"] | |
| ) | |
| async def list_users( | |
| payload: SystemUserListRequest, | |
| current_user = Depends(get_current_user), | |
| user_service: SystemUserService = Depends(get_system_user_service) | |
| ): | |
| """ | |
| List users with pagination and optional field projection following API standards. | |
| **Request Body:** | |
| - `page`: Page number (default: 1) | |
| - `page_size`: Page size (default: 20, max: 100) | |
| - `status_filter`: Filter by user status (active, inactive, suspended, etc.) | |
| - `projection_list`: Optional list of fields to include in response | |
| """ | |
| try: | |
| page_size = min(payload.page_size, 100) # Limit maximum page size | |
| # Always fetch full documents to enable merchant/security enrichment | |
| users, total_count = await user_service.list_users( | |
| payload.page, | |
| page_size, | |
| payload.status_filter, | |
| None, # Don't use projection in service layer | |
| current_user | |
| ) | |
| # Convert users to response with merchant and security details | |
| user_responses = [] | |
| for user in users: | |
| # Get the raw user document for the enhanced conversion | |
| user_doc = await user_service.collection.find_one({"user_id": user.user_id}) | |
| if user_doc: | |
| user_response = await user_service.convert_to_user_info_response_with_merchant_and_security(user_doc) | |
| # If projection is requested, filter the response to only include requested fields | |
| if payload.projection_list: | |
| # Convert response to dict for filtering | |
| response_dict = user_response.model_dump(exclude_none=False) | |
| filtered_dict = {} | |
| for field in payload.projection_list: | |
| if "." in field: | |
| # Handle nested fields like "security_settings.failed_login_attempts" | |
| parts = field.split(".") | |
| value = response_dict | |
| for part in parts: | |
| if isinstance(value, dict): | |
| value = value.get(part) | |
| elif value is not None: | |
| # If value is an object, try to get the attribute | |
| try: | |
| value = getattr(value, part, None) | |
| except (AttributeError, TypeError): | |
| value = None | |
| break | |
| else: | |
| value = None | |
| break | |
| # Store nested field with dot notation as key | |
| filtered_dict[field] = value | |
| else: | |
| # Handle simple fields | |
| value = response_dict.get(field) | |
| filtered_dict[field] = value | |
| user_responses.append(filtered_dict) | |
| else: | |
| user_responses.append(user_response) | |
| return { | |
| "users": user_responses, | |
| "total_count": total_count, | |
| "page": payload.page, | |
| "page_size": page_size | |
| } | |
| except Exception as e: | |
| logger.error(f"Error listing users: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to retrieve users" | |
| ) | |
| # Specific routes must come before generic /{user_id} routes | |
| async def suspend_user( | |
| user_id: str, | |
| suspend_data: SuspendUserRequest, | |
| current_user = Depends(require_admin_role), | |
| user_service: SystemUserService = Depends(get_system_user_service) | |
| ): | |
| """ | |
| Suspend user account with reason. Requires admin privileges. | |
| **Request Body:** | |
| - `reason`: Reason for suspension (required) | |
| """ | |
| try: | |
| # Prevent self-suspension | |
| if user_id == current_user.user_id: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Cannot suspend your own account" | |
| ) | |
| success = await user_service.suspend_user( | |
| user_id, | |
| suspend_data.reason, | |
| current_user.user_id | |
| ) | |
| if not success: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User not found" | |
| ) | |
| return StandardResponse( | |
| success=True, | |
| message="User suspended successfully" | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error suspending user {user_id}: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to suspend user" | |
| ) | |
| async def reset_password_admin( | |
| user_id: str, | |
| reset_data: ResetPasswordAdminRequest, | |
| current_user = Depends(require_admin_role), | |
| user_service: SystemUserService = Depends(get_system_user_service) | |
| ): | |
| """ | |
| Admin reset user password. Generates temporary password and optionally sends email. | |
| Requires admin privileges. | |
| **Request Body:** | |
| - `send_email`: Send password reset email to user (default: true) | |
| """ | |
| try: | |
| temp_password = await user_service.reset_password_admin( | |
| user_id, | |
| reset_data.send_email, | |
| current_user.user_id | |
| ) | |
| if not temp_password: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User not found" | |
| ) | |
| return StandardResponse( | |
| success=True, | |
| message="Password reset successfully", | |
| data={"temporary_password": temp_password} if not reset_data.send_email else None | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error resetting password for user {user_id}: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to reset password" | |
| ) | |
| async def unlock_user( | |
| user_id: str, | |
| current_user = Depends(require_admin_role), | |
| user_service: SystemUserService = Depends(get_system_user_service) | |
| ): | |
| """ | |
| Unlock user account by clearing account lock and failed login attempts. | |
| Requires admin privileges. | |
| """ | |
| try: | |
| success = await user_service.unlock_user(user_id, current_user.user_id) | |
| if not success: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User not found" | |
| ) | |
| return StandardResponse( | |
| success=True, | |
| message="User unlocked successfully" | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error unlocking user {user_id}: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to unlock user" | |
| ) | |
| # Generic routes come after specific routes | |
| async def get_user_by_id( | |
| user_id: str, | |
| current_user = Depends(require_admin_role), | |
| user_service: SystemUserService = Depends(get_system_user_service) | |
| ): | |
| """ | |
| Get user by ID with merchant details and security settings. Requires admin privileges. | |
| """ | |
| user_doc = await user_service.collection.find_one({"user_id": user_id}) | |
| if not user_doc: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User not found" | |
| ) | |
| return await user_service.convert_to_user_info_response_with_merchant_and_security(user_doc) | |
| async def update_user( | |
| user_id: str, | |
| update_data: UpdateUserRequest, | |
| current_user = Depends(require_admin_role), | |
| user_service: SystemUserService = Depends(get_system_user_service) | |
| ): | |
| """ | |
| Update user information with merchant details and security settings. Requires admin privileges. | |
| """ | |
| try: | |
| updated_user = await user_service.update_user(user_id, update_data, current_user.user_id) | |
| if not updated_user: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User not found" | |
| ) | |
| # Fetch the updated document to get merchant and security info | |
| user_doc = await user_service.collection.find_one({"user_id": user_id}) | |
| if user_doc: | |
| return await user_service.convert_to_user_info_response_with_merchant_and_security(user_doc) | |
| # Fallback to basic conversion if document not found | |
| return user_service.convert_to_user_info_response(updated_user) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error updating user {user_id}: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to update user" | |
| ) | |
| async def deactivate_user( | |
| user_id: str, | |
| current_user = Depends(require_admin_role), | |
| user_service: SystemUserService = Depends(get_system_user_service) | |
| ): | |
| """ | |
| Deactivate user account. Requires admin privileges. | |
| """ | |
| try: | |
| # Prevent self-deactivation | |
| if user_id == current_user.user_id: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Cannot deactivate your own account" | |
| ) | |
| success = await user_service.deactivate_user(user_id, current_user.user_id) | |
| if not success: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User not found" | |
| ) | |
| return StandardResponse( | |
| success=True, | |
| message="User deactivated successfully" | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error deactivating user {user_id}: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to deactivate user" | |
| ) | |