MukeshKapoor25's picture
feat(auth): Implement staff mobile OTP login and restructure user management routes
622d307
from pydantic import BaseModel, Field
from fastapi import APIRouter, Depends, HTTPException, status, Request
from fastapi.security import HTTPAuthorizationCredentials
from datetime import timedelta
from typing import Optional
import logging
from app.system_users.services.service import SystemUserService
from app.system_users.schemas.schema import (
LoginRequest, LoginResponse, CreateUserRequest, UpdateUserRequest,
ChangePasswordRequest, ForgotPasswordRequest, VerifyResetTokenRequest,
ResetPasswordRequest, UserInfoResponse, UserListResponse, UserListRequest,
StandardResponse, UserStatus
)
from app.system_users.models.model import SystemUserModel
from app.dependencies.auth import get_current_user, require_admin_role, get_system_user_service
from app.core.config import settings
logger = logging.getLogger(__name__)
# Router must be defined before any usage
router = APIRouter(
prefix="/auth",
tags=["Authentication & User Management"]
)
# --- Staff Mobile OTP Login ---
class StaffMobileOTPLoginRequest(BaseModel):
phone: str = Field(..., description="Staff mobile number")
otp: str = Field(..., description="One-time password")
class StaffMobileOTPLoginResponse(BaseModel):
access_token: str
token_type: str = "bearer"
expires_in: int
user_info: 'UserInfoResponse'
@router.post("/staff/login/mobile-otp", response_model=StaffMobileOTPLoginResponse, summary="Staff login with mobile and OTP")
async def staff_login_mobile_otp(
request: Request,
login_data: StaffMobileOTPLoginRequest,
user_service: SystemUserService = Depends(get_system_user_service)
):
"""
Staff login using mobile number and OTP (OTP hardcoded as 123456).
"""
if not login_data.phone or not login_data.otp:
raise HTTPException(status_code=400, detail="Phone and OTP are required")
if login_data.otp != "123456":
raise HTTPException(status_code=401, detail="Invalid OTP")
# Find user by phone
user = await user_service.get_user_by_phone(login_data.phone)
if not user:
raise HTTPException(status_code=401, detail="Staff user not found for this phone number")
# Only allow staff/employee roles (not admin/super_admin)
if user.role in ("admin", "super_admin"):
raise HTTPException(status_code=403, detail="Admin login not allowed via staff OTP login")
# Create access token for staff user
from datetime import timedelta
from app.core.config import settings
access_token_expires = timedelta(hours=settings.TOKEN_EXPIRATION_HOURS)
access_token = user_service.create_access_token(
data={
"sub": user.user_id,
"username": user.username,
"role": user.role,
"merchant_id": user.merchant_id,
"merchant_type": user.merchant_type
},
expires_delta=access_token_expires
)
user_info = user_service.convert_to_user_info_response(user)
return StaffMobileOTPLoginResponse(
access_token=access_token,
token_type="bearer",
expires_in=int(access_token_expires.total_seconds()),
user_info=user_info
)
@router.post("/login", response_model=LoginResponse)
async def login(
request: Request,
login_data: LoginRequest,
user_service: SystemUserService = Depends(get_system_user_service)
):
"""
Authenticate user and return access token.
Raises:
HTTPException: 400 - Missing required fields
HTTPException: 401 - Invalid credentials or account locked
HTTPException: 500 - Database or server error
"""
try:
# Validate input
if not login_data.email_or_phone or not login_data.email_or_phone.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email, phone, or username is required"
)
if not login_data.password or not login_data.password.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Password is required"
)
# Get client IP and user agent
client_ip = request.client.host if request.client else None
user_agent = request.headers.get("User-Agent")
# Authenticate user
try:
user, message = await user_service.authenticate_user(
email_or_phone=login_data.email_or_phone,
password=login_data.password,
ip_address=client_ip,
user_agent=user_agent
)
except Exception as auth_error:
logger.error(f"Authentication error: {auth_error}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Authentication service error"
)
if not user:
logger.warning(f"Login failed for {login_data.email_or_phone}: {message}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=message,
headers={"WWW-Authenticate": "Bearer"},
)
# Create access token
try:
access_token_expires = timedelta(hours=settings.TOKEN_EXPIRATION_HOURS)
if login_data.remember_me:
access_token_expires = timedelta(hours=settings.REMEMBER_ME_TOKEN_HOURS)
access_token = user_service.create_access_token(
data={
"sub": user.user_id,
"username": user.username,
"role": user.role,
"merchant_id": user.merchant_id,
"merchant_type": user.merchant_type
},
expires_delta=access_token_expires
)
except Exception as token_error:
logger.error(f"Error creating token: {token_error}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to generate authentication token"
)
# Convert user to response model
try:
user_info = user_service.convert_to_user_info_response(user)
except Exception as convert_error:
logger.error(f"Error converting user info: {convert_error}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to format user information"
)
logger.info(f"User logged in successfully: {user.username}")
return LoginResponse(
access_token=access_token,
token_type="bearer",
expires_in=int(access_token_expires.total_seconds()),
user_info=user_info
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Unexpected login error: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="An unexpected error occurred during login"
)
@router.get("/me", response_model=UserInfoResponse)
async def get_current_user_info(
current_user: SystemUserModel = Depends(get_current_user),
user_service: SystemUserService = Depends(get_system_user_service)
):
"""
Get current user information.
Raises:
HTTPException: 401 - Unauthorized (invalid or missing token)
HTTPException: 500 - Server error
"""
try:
return user_service.convert_to_user_info_response(current_user)
except AttributeError as e:
logger.error(f"Error accessing user attributes: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error retrieving user information"
)
except Exception as e:
logger.error(f"Unexpected error getting current user info: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="An unexpected error occurred"
)
@router.post("/users", response_model=UserInfoResponse)
async def create_user(
user_data: CreateUserRequest,
current_user: SystemUserModel = Depends(require_admin_role),
user_service: SystemUserService = Depends(get_system_user_service)
):
"""
Create a new user account. Requires admin privileges.
Raises:
HTTPException: 400 - Invalid data or user already exists
HTTPException: 403 - Insufficient permissions
HTTPException: 500 - Database or server error
"""
try:
# Additional validation
if not user_data.username or not user_data.username.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username is required"
)
if not user_data.email or not user_data.email.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email is required"
)
if not user_data.password or len(user_data.password) < 8:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Password must be at least 8 characters long"
)
new_user = await user_service.create_user(user_data, current_user.user_id)
logger.info(f"User created successfully by {current_user.username}: {new_user.username}")
return user_service.convert_to_user_info_response(new_user)
except HTTPException:
raise
except ValueError as e:
logger.error(f"Validation error creating user: {e}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
except Exception as e:
logger.error(f"Unexpected error creating user: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to create user"
)
@router.get("/users", response_model=UserListResponse)
async def list_users(
page: int = 1,
page_size: int = 20,
status_filter: Optional[UserStatus] = None,
current_user: SystemUserModel = Depends(require_admin_role),
user_service: SystemUserService = Depends(get_system_user_service)
):
"""
List users with pagination. Requires admin privileges.
Raises:
HTTPException: 400 - Invalid pagination parameters
HTTPException: 403 - Insufficient permissions
HTTPException: 500 - Database or server error
"""
try:
# Validate pagination parameters
if page < 1:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Page number must be greater than 0"
)
if page_size < 1:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Page size must be greater than 0"
)
if page_size > settings.MAX_PAGE_SIZE:
logger.info(f"Page size {page_size} exceeds max, setting to {settings.MAX_PAGE_SIZE}")
page_size = settings.MAX_PAGE_SIZE
users, total_count = await user_service.list_users(page, page_size, status_filter)
user_responses = [
user_service.convert_to_user_info_response(user) for user in users
]
return UserListResponse(
users=user_responses,
total_count=total_count,
page=page,
page_size=page_size
)
except HTTPException:
raise
except ValueError as e:
logger.error(f"Validation error listing users: {e}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
except Exception as e:
logger.error(f"Unexpected error listing users: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to retrieve users"
)
@router.post("/users/list")
async def list_users_with_projection(
payload: UserListRequest,
current_user: SystemUserModel = Depends(require_admin_role),
user_service: SystemUserService = Depends(get_system_user_service)
):
"""
List users with optional filters, pagination, and field projection.
Supports filtering by status, role, merchant_id, merchant_type with MongoDB projection for performance optimization.
**Request Body:**
- `filters`: Additional filter criteria
- `skip`: Number of records to skip (default: 0)
- `limit`: Maximum records to return (default: 100, max: 1000)
- `projection_list`: List of fields to include in response
- `status_filter`: Filter by user status (active, inactive, suspended, etc.)
- `role_filter`: Filter by user role
- `merchant_id_filter`: Filter by merchant ID
- `merchant_type_filter`: Filter by merchant type (ncnf, cnf, distributor, retail)
**Projection Fields Available:**
user_id, username, email, merchant_id, merchant_type, first_name, last_name,
role, status, permissions, last_login_at, created_at, updated_at, phone, timezone, language
**Benefits:**
- Reduced payload size (50-90% reduction possible)
- Better performance with field projection
- Flexible filtering options
Raises:
HTTPException: 400 - Invalid parameters
HTTPException: 403 - Insufficient permissions
HTTPException: 500 - Database or server error
"""
try:
# Validate limit
if payload.limit < 1:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Limit must be greater than 0"
)
if payload.limit > 1000:
logger.info(f"Limit {payload.limit} exceeds max 1000, setting to 1000")
payload.limit = 1000
if payload.skip < 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Skip must be 0 or greater"
)
# Call service with projection support
users = await user_service.list_users_with_projection(
filters=payload.filters,
skip=payload.skip,
limit=payload.limit,
projection_list=payload.projection_list,
status_filter=payload.status_filter,
role_filter=payload.role_filter,
merchant_id_filter=payload.merchant_id_filter,
merchant_type_filter=payload.merchant_type_filter
)
# Return raw dict if projection used, otherwise convert to response models
if payload.projection_list:
return {
"success": True,
"data": users,
"count": len(users),
"projection_applied": True,
"projected_fields": payload.projection_list
}
else:
user_responses = [
user_service.convert_to_user_info_response(user) for user in users
]
return {
"success": True,
"data": user_responses,
"count": len(user_responses),
"projection_applied": False
}
except HTTPException:
raise
except ValueError as e:
logger.error(f"Validation error in list_users_with_projection: {e}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
except Exception as e:
logger.error(f"Unexpected error listing users with projection: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to retrieve users"
)
@router.get("/users/{user_id}", response_model=UserInfoResponse)
async def get_user_by_id(
user_id: str,
current_user: SystemUserModel = Depends(require_admin_role),
user_service: SystemUserService = Depends(get_system_user_service)
):
"""
Get user by ID. Requires admin privileges.
Raises:
HTTPException: 400 - Invalid user ID
HTTPException: 403 - Insufficient permissions
HTTPException: 404 - User not found
HTTPException: 500 - Database or server error
"""
try:
# Validate user_id
if not user_id or not user_id.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="User ID is required"
)
user = await user_service.get_user_by_id(user_id)
if not user:
logger.warning(f"User not found: {user_id}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return user_service.convert_to_user_info_response(user)
except HTTPException:
raise
except Exception as e:
logger.error(f"Unexpected error getting user {user_id}: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to retrieve user"
)
@router.put("/users/{user_id}", response_model=UserInfoResponse)
async def update_user(
user_id: str,
update_data: UpdateUserRequest,
current_user: SystemUserModel = Depends(require_admin_role),
user_service: SystemUserService = Depends(get_system_user_service)
):
"""
Update user information. Requires admin privileges.
Raises:
HTTPException: 400 - Invalid data or user ID
HTTPException: 403 - Insufficient permissions
HTTPException: 404 - User not found
HTTPException: 500 - Database or server error
"""
try:
# Validate user_id
if not user_id or not user_id.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="User ID is required"
)
# Check if any data to update
if not update_data.dict(exclude_unset=True):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No data provided for update"
)
updated_user = await user_service.update_user(user_id, update_data, current_user.user_id)
if not updated_user:
logger.warning(f"User not found for update: {user_id}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
logger.info(f"User {user_id} updated by {current_user.username}")
return user_service.convert_to_user_info_response(updated_user)
except HTTPException:
raise
except ValueError as e:
logger.error(f"Validation error updating user {user_id}: {e}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
except Exception as e:
logger.error(f"Unexpected error updating user {user_id}: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to update user"
)
@router.put("/change-password", response_model=StandardResponse)
async def change_password(
password_data: ChangePasswordRequest,
current_user: SystemUserModel = Depends(get_current_user),
user_service: SystemUserService = Depends(get_system_user_service)
):
"""
Change current user's password.
Raises:
HTTPException: 400 - Invalid password or missing fields
HTTPException: 401 - Current password incorrect
HTTPException: 500 - Database or server error
"""
try:
# Validate passwords
if not password_data.current_password or not password_data.current_password.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Current password is required"
)
if not password_data.new_password or not password_data.new_password.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="New password is required"
)
if len(password_data.new_password) < 8:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="New password must be at least 8 characters long"
)
if password_data.current_password == password_data.new_password:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="New password must be different from current password"
)
success = await user_service.change_password(
user_id=current_user.user_id,
current_password=password_data.current_password,
new_password=password_data.new_password
)
if not success:
logger.warning(f"Failed password change attempt for user {current_user.user_id}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Current password is incorrect"
)
logger.info(f"Password changed successfully for user {current_user.username}")
return StandardResponse(
success=True,
message="Password changed successfully"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Unexpected error changing password for user {current_user.user_id}: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to change password"
)
@router.post("/forgot-password", response_model=StandardResponse)
async def forgot_password(
request_data: ForgotPasswordRequest,
user_service: SystemUserService = Depends(get_system_user_service)
):
"""
Request password reset link. Sends an email with reset link to the user.
This endpoint always returns success to prevent email enumeration attacks.
Raises:
HTTPException: 400 - Invalid email format
HTTPException: 500 - Server error
"""
try:
# Validate email
if not request_data.email or not request_data.email.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email is required"
)
# Send password reset email
# Note: We always return success to prevent email enumeration
await user_service.send_password_reset_email(request_data.email)
logger.info(f"Password reset requested for email: {request_data.email}")
return StandardResponse(
success=True,
message="If the email exists in our system, a password reset link has been sent"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Unexpected error in forgot password: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to process password reset request"
)
@router.post("/verify-reset-token", response_model=StandardResponse)
async def verify_reset_token(
request_data: VerifyResetTokenRequest,
user_service: SystemUserService = Depends(get_system_user_service)
):
"""
Verify if a password reset token is valid.
Use this endpoint to check if a token is valid before showing the reset password form.
Raises:
HTTPException: 400 - Invalid or expired token
HTTPException: 500 - Server error
"""
try:
# Validate token
if not request_data.token or not request_data.token.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Reset token is required"
)
# Verify token
token_data = await user_service.verify_password_reset_token(request_data.token)
if not token_data:
logger.warning("Invalid or expired reset token verification attempt")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid or expired reset token"
)
return StandardResponse(
success=True,
message="Reset token is valid",
data={"email": token_data.get("email")}
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Unexpected error verifying reset token: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to verify reset token"
)
@router.post("/reset-password", response_model=StandardResponse)
async def reset_password(
request_data: ResetPasswordRequest,
user_service: SystemUserService = Depends(get_system_user_service)
):
"""
Reset password using a valid reset token.
The token is validated and can only be used once. After successful reset,
the user can login with their new password.
Raises:
HTTPException: 400 - Invalid token or password requirements not met
HTTPException: 500 - Server error
"""
try:
# Validate inputs
if not request_data.token or not request_data.token.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Reset token is required"
)
if not request_data.new_password or not request_data.new_password.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="New password is required"
)
if len(request_data.new_password) < 8:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Password must be at least 8 characters long"
)
# Reset password
success, message = await user_service.reset_password_with_token(
token=request_data.token,
new_password=request_data.new_password
)
if not success:
logger.warning(f"Password reset failed: {message}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=message
)
logger.info("Password reset completed successfully")
return StandardResponse(
success=True,
message="Password has been reset successfully. You can now login with your new password."
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Unexpected error resetting password: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to reset password"
)
@router.delete("/users/{user_id}", response_model=StandardResponse)
async def deactivate_user(
user_id: str,
current_user: SystemUserModel = Depends(require_admin_role),
user_service: SystemUserService = Depends(get_system_user_service)
):
"""
Deactivate user account. Requires admin privileges.
Raises:
HTTPException: 400 - Cannot deactivate own account or invalid user ID
HTTPException: 403 - Insufficient permissions
HTTPException: 404 - User not found
HTTPException: 500 - Database or server error
"""
try:
# Validate user_id
if not user_id or not user_id.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="User ID is required"
)
# Prevent self-deactivation
if user_id == current_user.user_id:
logger.warning(f"User {current_user.username} attempted to deactivate their own account")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot deactivate your own account"
)
success = await user_service.deactivate_user(user_id, current_user.user_id)
if not success:
logger.warning(f"User not found for deactivation: {user_id}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
logger.info(f"User {user_id} deactivated by {current_user.username}")
return StandardResponse(
success=True,
message="User deactivated successfully"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Unexpected error deactivating user {user_id}: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to deactivate user"
)
@router.post("/logout", response_model=StandardResponse)
async def logout(
request: Request,
current_user: SystemUserModel = Depends(get_current_user),
user_service: SystemUserService = Depends(get_system_user_service)
):
"""
Logout current user.
Requires JWT token in Authorization header (Bearer token).
Logs out the user and records the logout event for audit purposes.
**Security:**
- Validates JWT token before logout
- Records logout event with IP address, user agent, and session duration
- Stores audit log for compliance and security tracking
**Note:** Since we're using stateless JWT tokens, the client is responsible for:
- Removing the token from local storage/cookies
- Clearing any cached user data
- Redirecting to login page
For enhanced security in production:
- Consider implementing token blacklisting
- Use short-lived access tokens with refresh tokens
- Implement server-side session management if needed
Raises:
HTTPException: 401 - Unauthorized (invalid or missing token)
HTTPException: 500 - Server error
"""
try:
# Get client information for audit logging
client_ip = request.client.host if request.client else None
user_agent = request.headers.get("User-Agent")
# Record logout for audit purposes
await user_service.record_logout(
user=current_user,
ip_address=client_ip,
user_agent=user_agent
)
logger.info(
f"User logged out successfully: {current_user.username}",
extra={
"event": "logout_success",
"user_id": current_user.user_id,
"username": current_user.username,
"ip_address": client_ip
}
)
return StandardResponse(
success=True,
message="Logged out successfully"
)
except AttributeError as e:
logger.error(
f"Error accessing user during logout: {e}",
extra={"error_type": "attribute_error"},
exc_info=True
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error during logout"
)
except Exception as e:
logger.error(
f"Unexpected logout error: {str(e)}",
extra={
"error_type": type(e).__name__,
"user_id": current_user.user_id if current_user else None
},
exc_info=True
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="An unexpected error occurred during logout"
)
# Create default super admin endpoint (for initial setup)
@router.post("/setup/super-admin", response_model=UserInfoResponse)
async def create_super_admin(
user_data: CreateUserRequest,
user_service: SystemUserService = Depends(get_system_user_service)
):
"""
Create the first super admin user. Only works if no users exist in the system.
Raises:
HTTPException: 400 - Invalid data
HTTPException: 403 - Super admin already exists
HTTPException: 500 - Database or server error
"""
try:
# Validate required fields
if not user_data.username or not user_data.username.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username is required"
)
if not user_data.email or not user_data.email.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email is required"
)
if not user_data.password or len(user_data.password) < 8:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Password must be at least 8 characters long"
)
# Check if any users exist
try:
users, total_count = await user_service.list_users(page=1, page_size=1)
except Exception as db_error:
logger.error(f"Database error checking existing users: {db_error}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to verify system state"
)
if total_count > 0:
logger.warning("Attempted to create super admin when users already exist")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Super admin already exists or users are present in system"
)
# Force super admin role
user_data.role = "super_admin"
# Create super admin
super_admin = await user_service.create_user(user_data, "system")
logger.info(f"Super admin created: {super_admin.username}")
return user_service.convert_to_user_info_response(super_admin)
except HTTPException:
raise
except ValueError as e:
logger.error(f"Validation error creating super admin: {e}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
except Exception as e:
logger.error(f"Unexpected error creating super admin: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to create super admin"
)