Spaces:
Running
Running
| from pydantic import BaseModel, Field | |
| from fastapi import APIRouter, Depends, HTTPException, status, Request | |
| from fastapi.security import HTTPAuthorizationCredentials | |
| from datetime import timedelta | |
| from typing import Optional | |
| import logging | |
| from app.system_users.services.service import SystemUserService | |
| from app.system_users.schemas.schema import ( | |
| LoginRequest, LoginResponse, CreateUserRequest, UpdateUserRequest, | |
| ChangePasswordRequest, ForgotPasswordRequest, VerifyResetTokenRequest, | |
| ResetPasswordRequest, UserInfoResponse, UserListResponse, UserListRequest, | |
| StandardResponse, UserStatus | |
| ) | |
| from app.system_users.models.model import SystemUserModel | |
| from app.dependencies.auth import get_current_user, require_admin_role, get_system_user_service | |
| from app.core.config import settings | |
| logger = logging.getLogger(__name__) | |
| # Router must be defined before any usage | |
| router = APIRouter( | |
| prefix="/auth", | |
| tags=["Authentication & User Management"] | |
| ) | |
| # --- Staff Mobile OTP Login --- | |
| class StaffMobileOTPLoginRequest(BaseModel): | |
| phone: str = Field(..., description="Staff mobile number") | |
| otp: str = Field(..., description="One-time password") | |
| class StaffMobileOTPLoginResponse(BaseModel): | |
| access_token: str | |
| token_type: str = "bearer" | |
| expires_in: int | |
| user_info: 'UserInfoResponse' | |
| async def staff_login_mobile_otp( | |
| request: Request, | |
| login_data: StaffMobileOTPLoginRequest, | |
| user_service: SystemUserService = Depends(get_system_user_service) | |
| ): | |
| """ | |
| Staff login using mobile number and OTP (OTP hardcoded as 123456). | |
| """ | |
| if not login_data.phone or not login_data.otp: | |
| raise HTTPException(status_code=400, detail="Phone and OTP are required") | |
| if login_data.otp != "123456": | |
| raise HTTPException(status_code=401, detail="Invalid OTP") | |
| # Find user by phone | |
| user = await user_service.get_user_by_phone(login_data.phone) | |
| if not user: | |
| raise HTTPException(status_code=401, detail="Staff user not found for this phone number") | |
| # Only allow staff/employee roles (not admin/super_admin) | |
| if user.role in ("admin", "super_admin"): | |
| raise HTTPException(status_code=403, detail="Admin login not allowed via staff OTP login") | |
| # Create access token for staff user | |
| from datetime import timedelta | |
| from app.core.config import settings | |
| access_token_expires = timedelta(hours=settings.TOKEN_EXPIRATION_HOURS) | |
| access_token = user_service.create_access_token( | |
| data={ | |
| "sub": user.user_id, | |
| "username": user.username, | |
| "role": user.role, | |
| "merchant_id": user.merchant_id, | |
| "merchant_type": user.merchant_type | |
| }, | |
| expires_delta=access_token_expires | |
| ) | |
| user_info = user_service.convert_to_user_info_response(user) | |
| return StaffMobileOTPLoginResponse( | |
| access_token=access_token, | |
| token_type="bearer", | |
| expires_in=int(access_token_expires.total_seconds()), | |
| user_info=user_info | |
| ) | |
| async def login( | |
| request: Request, | |
| login_data: LoginRequest, | |
| user_service: SystemUserService = Depends(get_system_user_service) | |
| ): | |
| """ | |
| Authenticate user and return access token. | |
| Raises: | |
| HTTPException: 400 - Missing required fields | |
| HTTPException: 401 - Invalid credentials or account locked | |
| HTTPException: 500 - Database or server error | |
| """ | |
| try: | |
| # Validate input | |
| if not login_data.email_or_phone or not login_data.email_or_phone.strip(): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Email, phone, or username is required" | |
| ) | |
| if not login_data.password or not login_data.password.strip(): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Password is required" | |
| ) | |
| # Get client IP and user agent | |
| client_ip = request.client.host if request.client else None | |
| user_agent = request.headers.get("User-Agent") | |
| # Authenticate user | |
| try: | |
| user, message = await user_service.authenticate_user( | |
| email_or_phone=login_data.email_or_phone, | |
| password=login_data.password, | |
| ip_address=client_ip, | |
| user_agent=user_agent | |
| ) | |
| except Exception as auth_error: | |
| logger.error(f"Authentication error: {auth_error}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Authentication service error" | |
| ) | |
| if not user: | |
| logger.warning(f"Login failed for {login_data.email_or_phone}: {message}") | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail=message, | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| # Create access token | |
| try: | |
| access_token_expires = timedelta(hours=settings.TOKEN_EXPIRATION_HOURS) | |
| if login_data.remember_me: | |
| access_token_expires = timedelta(hours=settings.REMEMBER_ME_TOKEN_HOURS) | |
| access_token = user_service.create_access_token( | |
| data={ | |
| "sub": user.user_id, | |
| "username": user.username, | |
| "role": user.role, | |
| "merchant_id": user.merchant_id, | |
| "merchant_type": user.merchant_type | |
| }, | |
| expires_delta=access_token_expires | |
| ) | |
| except Exception as token_error: | |
| logger.error(f"Error creating token: {token_error}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to generate authentication token" | |
| ) | |
| # Convert user to response model | |
| try: | |
| user_info = user_service.convert_to_user_info_response(user) | |
| except Exception as convert_error: | |
| logger.error(f"Error converting user info: {convert_error}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to format user information" | |
| ) | |
| logger.info(f"User logged in successfully: {user.username}") | |
| return LoginResponse( | |
| access_token=access_token, | |
| token_type="bearer", | |
| expires_in=int(access_token_expires.total_seconds()), | |
| user_info=user_info | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Unexpected login error: {str(e)}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="An unexpected error occurred during login" | |
| ) | |
| async def get_current_user_info( | |
| current_user: SystemUserModel = Depends(get_current_user), | |
| user_service: SystemUserService = Depends(get_system_user_service) | |
| ): | |
| """ | |
| Get current user information. | |
| Raises: | |
| HTTPException: 401 - Unauthorized (invalid or missing token) | |
| HTTPException: 500 - Server error | |
| """ | |
| try: | |
| return user_service.convert_to_user_info_response(current_user) | |
| except AttributeError as e: | |
| logger.error(f"Error accessing user attributes: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Error retrieving user information" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Unexpected error getting current user info: {str(e)}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="An unexpected error occurred" | |
| ) | |
| async def create_user( | |
| user_data: CreateUserRequest, | |
| current_user: SystemUserModel = Depends(require_admin_role), | |
| user_service: SystemUserService = Depends(get_system_user_service) | |
| ): | |
| """ | |
| Create a new user account. Requires admin privileges. | |
| Raises: | |
| HTTPException: 400 - Invalid data or user already exists | |
| HTTPException: 403 - Insufficient permissions | |
| HTTPException: 500 - Database or server error | |
| """ | |
| try: | |
| # Additional validation | |
| if not user_data.username or not user_data.username.strip(): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Username is required" | |
| ) | |
| if not user_data.email or not user_data.email.strip(): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Email is required" | |
| ) | |
| if not user_data.password or len(user_data.password) < 8: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Password must be at least 8 characters long" | |
| ) | |
| new_user = await user_service.create_user(user_data, current_user.user_id) | |
| logger.info(f"User created successfully by {current_user.username}: {new_user.username}") | |
| return user_service.convert_to_user_info_response(new_user) | |
| except HTTPException: | |
| raise | |
| except ValueError as e: | |
| logger.error(f"Validation error creating user: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=str(e) | |
| ) | |
| except Exception as e: | |
| logger.error(f"Unexpected error creating user: {str(e)}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to create user" | |
| ) | |
| async def list_users( | |
| page: int = 1, | |
| page_size: int = 20, | |
| status_filter: Optional[UserStatus] = None, | |
| current_user: SystemUserModel = Depends(require_admin_role), | |
| user_service: SystemUserService = Depends(get_system_user_service) | |
| ): | |
| """ | |
| List users with pagination. Requires admin privileges. | |
| Raises: | |
| HTTPException: 400 - Invalid pagination parameters | |
| HTTPException: 403 - Insufficient permissions | |
| HTTPException: 500 - Database or server error | |
| """ | |
| try: | |
| # Validate pagination parameters | |
| if page < 1: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Page number must be greater than 0" | |
| ) | |
| if page_size < 1: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Page size must be greater than 0" | |
| ) | |
| if page_size > settings.MAX_PAGE_SIZE: | |
| logger.info(f"Page size {page_size} exceeds max, setting to {settings.MAX_PAGE_SIZE}") | |
| page_size = settings.MAX_PAGE_SIZE | |
| users, total_count = await user_service.list_users(page, page_size, status_filter) | |
| user_responses = [ | |
| user_service.convert_to_user_info_response(user) for user in users | |
| ] | |
| return UserListResponse( | |
| users=user_responses, | |
| total_count=total_count, | |
| page=page, | |
| page_size=page_size | |
| ) | |
| except HTTPException: | |
| raise | |
| except ValueError as e: | |
| logger.error(f"Validation error listing users: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=str(e) | |
| ) | |
| except Exception as e: | |
| logger.error(f"Unexpected error listing users: {str(e)}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to retrieve users" | |
| ) | |
| async def list_users_with_projection( | |
| payload: UserListRequest, | |
| current_user: SystemUserModel = Depends(require_admin_role), | |
| user_service: SystemUserService = Depends(get_system_user_service) | |
| ): | |
| """ | |
| List users with optional filters, pagination, and field projection. | |
| Supports filtering by status, role, merchant_id, merchant_type with MongoDB projection for performance optimization. | |
| **Request Body:** | |
| - `filters`: Additional filter criteria | |
| - `skip`: Number of records to skip (default: 0) | |
| - `limit`: Maximum records to return (default: 100, max: 1000) | |
| - `projection_list`: List of fields to include in response | |
| - `status_filter`: Filter by user status (active, inactive, suspended, etc.) | |
| - `role_filter`: Filter by user role | |
| - `merchant_id_filter`: Filter by merchant ID | |
| - `merchant_type_filter`: Filter by merchant type (ncnf, cnf, distributor, retail) | |
| **Projection Fields Available:** | |
| user_id, username, email, merchant_id, merchant_type, first_name, last_name, | |
| role, status, permissions, last_login_at, created_at, updated_at, phone, timezone, language | |
| **Benefits:** | |
| - Reduced payload size (50-90% reduction possible) | |
| - Better performance with field projection | |
| - Flexible filtering options | |
| Raises: | |
| HTTPException: 400 - Invalid parameters | |
| HTTPException: 403 - Insufficient permissions | |
| HTTPException: 500 - Database or server error | |
| """ | |
| try: | |
| # Validate limit | |
| if payload.limit < 1: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Limit must be greater than 0" | |
| ) | |
| if payload.limit > 1000: | |
| logger.info(f"Limit {payload.limit} exceeds max 1000, setting to 1000") | |
| payload.limit = 1000 | |
| if payload.skip < 0: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Skip must be 0 or greater" | |
| ) | |
| # Call service with projection support | |
| users = await user_service.list_users_with_projection( | |
| filters=payload.filters, | |
| skip=payload.skip, | |
| limit=payload.limit, | |
| projection_list=payload.projection_list, | |
| status_filter=payload.status_filter, | |
| role_filter=payload.role_filter, | |
| merchant_id_filter=payload.merchant_id_filter, | |
| merchant_type_filter=payload.merchant_type_filter | |
| ) | |
| # Return raw dict if projection used, otherwise convert to response models | |
| if payload.projection_list: | |
| return { | |
| "success": True, | |
| "data": users, | |
| "count": len(users), | |
| "projection_applied": True, | |
| "projected_fields": payload.projection_list | |
| } | |
| else: | |
| user_responses = [ | |
| user_service.convert_to_user_info_response(user) for user in users | |
| ] | |
| return { | |
| "success": True, | |
| "data": user_responses, | |
| "count": len(user_responses), | |
| "projection_applied": False | |
| } | |
| except HTTPException: | |
| raise | |
| except ValueError as e: | |
| logger.error(f"Validation error in list_users_with_projection: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=str(e) | |
| ) | |
| except Exception as e: | |
| logger.error(f"Unexpected error listing users with projection: {str(e)}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to retrieve users" | |
| ) | |
| async def get_user_by_id( | |
| user_id: str, | |
| current_user: SystemUserModel = Depends(require_admin_role), | |
| user_service: SystemUserService = Depends(get_system_user_service) | |
| ): | |
| """ | |
| Get user by ID. Requires admin privileges. | |
| Raises: | |
| HTTPException: 400 - Invalid user ID | |
| HTTPException: 403 - Insufficient permissions | |
| HTTPException: 404 - User not found | |
| HTTPException: 500 - Database or server error | |
| """ | |
| try: | |
| # Validate user_id | |
| if not user_id or not user_id.strip(): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="User ID is required" | |
| ) | |
| user = await user_service.get_user_by_id(user_id) | |
| if not user: | |
| logger.warning(f"User not found: {user_id}") | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User not found" | |
| ) | |
| return user_service.convert_to_user_info_response(user) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Unexpected error getting user {user_id}: {str(e)}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to retrieve user" | |
| ) | |
| async def update_user( | |
| user_id: str, | |
| update_data: UpdateUserRequest, | |
| current_user: SystemUserModel = Depends(require_admin_role), | |
| user_service: SystemUserService = Depends(get_system_user_service) | |
| ): | |
| """ | |
| Update user information. Requires admin privileges. | |
| Raises: | |
| HTTPException: 400 - Invalid data or user ID | |
| HTTPException: 403 - Insufficient permissions | |
| HTTPException: 404 - User not found | |
| HTTPException: 500 - Database or server error | |
| """ | |
| try: | |
| # Validate user_id | |
| if not user_id or not user_id.strip(): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="User ID is required" | |
| ) | |
| # Check if any data to update | |
| if not update_data.dict(exclude_unset=True): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="No data provided for update" | |
| ) | |
| updated_user = await user_service.update_user(user_id, update_data, current_user.user_id) | |
| if not updated_user: | |
| logger.warning(f"User not found for update: {user_id}") | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User not found" | |
| ) | |
| logger.info(f"User {user_id} updated by {current_user.username}") | |
| return user_service.convert_to_user_info_response(updated_user) | |
| except HTTPException: | |
| raise | |
| except ValueError as e: | |
| logger.error(f"Validation error updating user {user_id}: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=str(e) | |
| ) | |
| except Exception as e: | |
| logger.error(f"Unexpected error updating user {user_id}: {str(e)}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to update user" | |
| ) | |
| async def change_password( | |
| password_data: ChangePasswordRequest, | |
| current_user: SystemUserModel = Depends(get_current_user), | |
| user_service: SystemUserService = Depends(get_system_user_service) | |
| ): | |
| """ | |
| Change current user's password. | |
| Raises: | |
| HTTPException: 400 - Invalid password or missing fields | |
| HTTPException: 401 - Current password incorrect | |
| HTTPException: 500 - Database or server error | |
| """ | |
| try: | |
| # Validate passwords | |
| if not password_data.current_password or not password_data.current_password.strip(): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Current password is required" | |
| ) | |
| if not password_data.new_password or not password_data.new_password.strip(): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="New password is required" | |
| ) | |
| if len(password_data.new_password) < 8: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="New password must be at least 8 characters long" | |
| ) | |
| if password_data.current_password == password_data.new_password: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="New password must be different from current password" | |
| ) | |
| success = await user_service.change_password( | |
| user_id=current_user.user_id, | |
| current_password=password_data.current_password, | |
| new_password=password_data.new_password | |
| ) | |
| if not success: | |
| logger.warning(f"Failed password change attempt for user {current_user.user_id}") | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Current password is incorrect" | |
| ) | |
| logger.info(f"Password changed successfully for user {current_user.username}") | |
| return StandardResponse( | |
| success=True, | |
| message="Password changed successfully" | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Unexpected error changing password for user {current_user.user_id}: {str(e)}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to change password" | |
| ) | |
| async def forgot_password( | |
| request_data: ForgotPasswordRequest, | |
| user_service: SystemUserService = Depends(get_system_user_service) | |
| ): | |
| """ | |
| Request password reset link. Sends an email with reset link to the user. | |
| This endpoint always returns success to prevent email enumeration attacks. | |
| Raises: | |
| HTTPException: 400 - Invalid email format | |
| HTTPException: 500 - Server error | |
| """ | |
| try: | |
| # Validate email | |
| if not request_data.email or not request_data.email.strip(): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Email is required" | |
| ) | |
| # Send password reset email | |
| # Note: We always return success to prevent email enumeration | |
| await user_service.send_password_reset_email(request_data.email) | |
| logger.info(f"Password reset requested for email: {request_data.email}") | |
| return StandardResponse( | |
| success=True, | |
| message="If the email exists in our system, a password reset link has been sent" | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Unexpected error in forgot password: {str(e)}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to process password reset request" | |
| ) | |
| async def verify_reset_token( | |
| request_data: VerifyResetTokenRequest, | |
| user_service: SystemUserService = Depends(get_system_user_service) | |
| ): | |
| """ | |
| Verify if a password reset token is valid. | |
| Use this endpoint to check if a token is valid before showing the reset password form. | |
| Raises: | |
| HTTPException: 400 - Invalid or expired token | |
| HTTPException: 500 - Server error | |
| """ | |
| try: | |
| # Validate token | |
| if not request_data.token or not request_data.token.strip(): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Reset token is required" | |
| ) | |
| # Verify token | |
| token_data = await user_service.verify_password_reset_token(request_data.token) | |
| if not token_data: | |
| logger.warning("Invalid or expired reset token verification attempt") | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Invalid or expired reset token" | |
| ) | |
| return StandardResponse( | |
| success=True, | |
| message="Reset token is valid", | |
| data={"email": token_data.get("email")} | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Unexpected error verifying reset token: {str(e)}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to verify reset token" | |
| ) | |
| async def reset_password( | |
| request_data: ResetPasswordRequest, | |
| user_service: SystemUserService = Depends(get_system_user_service) | |
| ): | |
| """ | |
| Reset password using a valid reset token. | |
| The token is validated and can only be used once. After successful reset, | |
| the user can login with their new password. | |
| Raises: | |
| HTTPException: 400 - Invalid token or password requirements not met | |
| HTTPException: 500 - Server error | |
| """ | |
| try: | |
| # Validate inputs | |
| if not request_data.token or not request_data.token.strip(): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Reset token is required" | |
| ) | |
| if not request_data.new_password or not request_data.new_password.strip(): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="New password is required" | |
| ) | |
| if len(request_data.new_password) < 8: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Password must be at least 8 characters long" | |
| ) | |
| # Reset password | |
| success, message = await user_service.reset_password_with_token( | |
| token=request_data.token, | |
| new_password=request_data.new_password | |
| ) | |
| if not success: | |
| logger.warning(f"Password reset failed: {message}") | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=message | |
| ) | |
| logger.info("Password reset completed successfully") | |
| return StandardResponse( | |
| success=True, | |
| message="Password has been reset successfully. You can now login with your new password." | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Unexpected error resetting password: {str(e)}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to reset password" | |
| ) | |
| async def deactivate_user( | |
| user_id: str, | |
| current_user: SystemUserModel = Depends(require_admin_role), | |
| user_service: SystemUserService = Depends(get_system_user_service) | |
| ): | |
| """ | |
| Deactivate user account. Requires admin privileges. | |
| Raises: | |
| HTTPException: 400 - Cannot deactivate own account or invalid user ID | |
| HTTPException: 403 - Insufficient permissions | |
| HTTPException: 404 - User not found | |
| HTTPException: 500 - Database or server error | |
| """ | |
| try: | |
| # Validate user_id | |
| if not user_id or not user_id.strip(): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="User ID is required" | |
| ) | |
| # Prevent self-deactivation | |
| if user_id == current_user.user_id: | |
| logger.warning(f"User {current_user.username} attempted to deactivate their own account") | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Cannot deactivate your own account" | |
| ) | |
| success = await user_service.deactivate_user(user_id, current_user.user_id) | |
| if not success: | |
| logger.warning(f"User not found for deactivation: {user_id}") | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="User not found" | |
| ) | |
| logger.info(f"User {user_id} deactivated by {current_user.username}") | |
| return StandardResponse( | |
| success=True, | |
| message="User deactivated successfully" | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Unexpected error deactivating user {user_id}: {str(e)}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to deactivate user" | |
| ) | |
| async def logout( | |
| request: Request, | |
| current_user: SystemUserModel = Depends(get_current_user), | |
| user_service: SystemUserService = Depends(get_system_user_service) | |
| ): | |
| """ | |
| Logout current user. | |
| Requires JWT token in Authorization header (Bearer token). | |
| Logs out the user and records the logout event for audit purposes. | |
| **Security:** | |
| - Validates JWT token before logout | |
| - Records logout event with IP address, user agent, and session duration | |
| - Stores audit log for compliance and security tracking | |
| **Note:** Since we're using stateless JWT tokens, the client is responsible for: | |
| - Removing the token from local storage/cookies | |
| - Clearing any cached user data | |
| - Redirecting to login page | |
| For enhanced security in production: | |
| - Consider implementing token blacklisting | |
| - Use short-lived access tokens with refresh tokens | |
| - Implement server-side session management if needed | |
| Raises: | |
| HTTPException: 401 - Unauthorized (invalid or missing token) | |
| HTTPException: 500 - Server error | |
| """ | |
| try: | |
| # Get client information for audit logging | |
| client_ip = request.client.host if request.client else None | |
| user_agent = request.headers.get("User-Agent") | |
| # Record logout for audit purposes | |
| await user_service.record_logout( | |
| user=current_user, | |
| ip_address=client_ip, | |
| user_agent=user_agent | |
| ) | |
| logger.info( | |
| f"User logged out successfully: {current_user.username}", | |
| extra={ | |
| "event": "logout_success", | |
| "user_id": current_user.user_id, | |
| "username": current_user.username, | |
| "ip_address": client_ip | |
| } | |
| ) | |
| return StandardResponse( | |
| success=True, | |
| message="Logged out successfully" | |
| ) | |
| except AttributeError as e: | |
| logger.error( | |
| f"Error accessing user during logout: {e}", | |
| extra={"error_type": "attribute_error"}, | |
| exc_info=True | |
| ) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Error during logout" | |
| ) | |
| except Exception as e: | |
| logger.error( | |
| f"Unexpected logout error: {str(e)}", | |
| extra={ | |
| "error_type": type(e).__name__, | |
| "user_id": current_user.user_id if current_user else None | |
| }, | |
| exc_info=True | |
| ) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="An unexpected error occurred during logout" | |
| ) | |
| # Create default super admin endpoint (for initial setup) | |
| async def create_super_admin( | |
| user_data: CreateUserRequest, | |
| user_service: SystemUserService = Depends(get_system_user_service) | |
| ): | |
| """ | |
| Create the first super admin user. Only works if no users exist in the system. | |
| Raises: | |
| HTTPException: 400 - Invalid data | |
| HTTPException: 403 - Super admin already exists | |
| HTTPException: 500 - Database or server error | |
| """ | |
| try: | |
| # Validate required fields | |
| if not user_data.username or not user_data.username.strip(): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Username is required" | |
| ) | |
| if not user_data.email or not user_data.email.strip(): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Email is required" | |
| ) | |
| if not user_data.password or len(user_data.password) < 8: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Password must be at least 8 characters long" | |
| ) | |
| # Check if any users exist | |
| try: | |
| users, total_count = await user_service.list_users(page=1, page_size=1) | |
| except Exception as db_error: | |
| logger.error(f"Database error checking existing users: {db_error}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to verify system state" | |
| ) | |
| if total_count > 0: | |
| logger.warning("Attempted to create super admin when users already exist") | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Super admin already exists or users are present in system" | |
| ) | |
| # Force super admin role | |
| user_data.role = "super_admin" | |
| # Create super admin | |
| super_admin = await user_service.create_user(user_data, "system") | |
| logger.info(f"Super admin created: {super_admin.username}") | |
| return user_service.convert_to_user_info_response(super_admin) | |
| except HTTPException: | |
| raise | |
| except ValueError as e: | |
| logger.error(f"Validation error creating super admin: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=str(e) | |
| ) | |
| except Exception as e: | |
| logger.error(f"Unexpected error creating super admin: {str(e)}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to create super admin" | |
| ) |