""" Authentication router for login, logout, and token management endpoints. Provides JWT-based authentication with enhanced security features. """ from datetime import timedelta from typing import Optional, List, Dict from fastapi import APIRouter, Depends, HTTPException, status, Body, Request from pydantic import BaseModel, EmailStr from app.system_users.services.service import SystemUserService from app.dependencies.auth import get_system_user_service, get_current_user from app.system_users.models.model import SystemUserModel from app.core.config import settings from app.core.logging import get_logger # Customer auth imports moved to customer_router.py logger = get_logger(__name__) router = APIRouter(prefix="/auth", tags=["Authentication"]) def _get_accessible_widgets(user_role) -> List[Dict]: """Generate accessible widgets based on user role for authentication system.""" # Base widgets available to all roles - Authentication focused base_widgets = [ { "widget_id": "wid_login_count_001", "widget_type": "kpi", "title": "Login Count", "accessible": True }, { "widget_id": "wid_active_users_001", "widget_type": "kpi", "title": "Active Users", "accessible": True }, { "widget_id": "wid_failed_logins_001", "widget_type": "kpi", "title": "Failed Logins (24h)", "accessible": True }, { "widget_id": "wid_user_roles_001", "widget_type": "chart", "title": "User Roles Distribution", "accessible": True }, { "widget_id": "wid_login_trend_001", "widget_type": "chart", "title": "Login Trend (7 days)", "accessible": True } ] # Advanced widgets for managers and above advanced_widgets = [ { "widget_id": "wid_security_events_001", "widget_type": "table", "title": "Recent Security Events", "accessible": True }, { "widget_id": "wid_locked_accounts_001", "widget_type": "table", "title": "Locked Accounts", "accessible": True }, { "widget_id": "wid_recent_registrations_001", "widget_type": "table", "title": "Recent User Registrations", "accessible": True } ] # Return widgets based on role if user_role in ["super_admin", "admin", "role_super_admin", "role_company_admin"]: return base_widgets + advanced_widgets elif user_role in ["manager", "role_cnf_manager"]: return base_widgets + advanced_widgets[:2] # Limited advanced widgets else: return base_widgets # Basic widgets only class LoginRequest(BaseModel): """Login request model.""" email_or_phone: str # Can be email, phone number, or username password: str class LoginResponse(BaseModel): """Login response model.""" access_token: str refresh_token: str token_type: str = "bearer" expires_in: int user: dict access_menu: dict warnings: Optional[str] = None class TokenRefreshRequest(BaseModel): """Token refresh request.""" refresh_token: str @router.post("/login", response_model=LoginResponse) async def login( request: Request, login_data: LoginRequest, user_service: SystemUserService = Depends(get_system_user_service) ): """ Authenticate user and return JWT tokens. - **email_or_phone**: User email, phone number, or username - **password**: User password Raises: HTTPException: 401 - Invalid credentials or account locked HTTPException: 500 - Database or server error """ try: logger.info(f"Login attempt for: {login_data.email_or_phone}") # Validate input if not login_data.email_or_phone or not login_data.email_or_phone.strip(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email, phone, or username is required" ) if not login_data.password or not login_data.password.strip(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Password is required" ) # Get client IP and user agent for security tracking client_ip = request.client.host if request.client else None user_agent = request.headers.get("User-Agent") # Authenticate user user, message = await user_service.authenticate_user( login_data.email_or_phone, login_data.password, ip_address=client_ip, user_agent=user_agent ) if not user: logger.warning(f"Login failed for {login_data.email_or_phone}: {message}") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=message, headers={"WWW-Authenticate": "Bearer"} ) logger.info(f"User authenticated: {user.username}, role: {user.role}") # Fetch permissions from SCM access roles collection based on user role try: scm_permissions = await user_service.get_scm_permissions_by_role(user.role) if scm_permissions: logger.info(f"SCM permissions loaded: {list(scm_permissions.keys())}") else: logger.warning(f"No SCM permissions found for role: {user.role}") except Exception as perm_error: logger.error(f"Error fetching permissions: {perm_error}") scm_permissions = None # Create tokens try: access_token_expires = timedelta(hours=settings.TOKEN_EXPIRATION_HOURS) access_token = user_service.create_access_token( data={"sub": user.user_id, "username": user.username, "role": user.role, "merchant_id": user.merchant_id, "merchant_type": user.merchant_type}, expires_delta=access_token_expires ) refresh_token = user_service.create_refresh_token( data={"sub": user.user_id, "username": user.username} ) except Exception as token_error: logger.error(f"Error creating tokens: {token_error}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to generate authentication tokens" ) # Generate accessible widgets based on user role accessible_widgets = _get_accessible_widgets(user.role) # Return user info without sensitive data user_info = { "user_id": user.user_id, "username": user.username, "email": user.email, "merchant_id": user.merchant_id, "merchant_type": user.merchant_type, "full_name": user.full_name, "role": user.role, "status": user.status.value, "last_login_at": user.last_login_at } # Access menu structure with SCM permissions access_menu = { "permissions": scm_permissions, "accessible_widgets": accessible_widgets } # Check password rotation policy password_rotation_status = user_service.get_password_rotation_status(user) warning_message = None if password_rotation_status["password_status"] == "expired": warning_message = f"Your password has expired. Please change your password immediately. Your password is {password_rotation_status['password_age_days']} days old." logger.warning( f"Password rotation required for user: {user.username}", extra={ "event": "password_rotation_required", "user_id": user.user_id, "password_age_days": password_rotation_status["password_age_days"] } ) elif password_rotation_status["password_status"] == "warning": days_until_expiry = password_rotation_status["days_until_expiry"] warning_message = f"Your password will expire in {days_until_expiry} day(s). Please change your password soon." logger.info( f"Password rotation warning for user: {user.username}", extra={ "event": "password_rotation_warning", "user_id": user.user_id, "days_until_expiry": days_until_expiry } ) logger.info(f"✅ User logged in successfully: {user.username}") return LoginResponse( access_token=access_token, refresh_token=refresh_token, expires_in=int(access_token_expires.total_seconds()), user=user_info, access_menu=access_menu, warnings=warning_message ) except HTTPException: raise except Exception as e: logger.error(f"Unexpected login error for {login_data.email_or_phone}: {str(e)}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An unexpected error occurred during authentication" ) class OAuth2LoginRequest(BaseModel): """OAuth2 compatible login request.""" username: str # Can be email or phone password: str grant_type: str = "password" @router.post("/refresh") async def refresh_token( refresh_data: TokenRefreshRequest, user_service: SystemUserService = Depends(get_system_user_service) ): """ Refresh access token using refresh token. Raises: HTTPException: 400 - Missing or invalid refresh token HTTPException: 401 - Token expired or user inactive HTTPException: 500 - Server error """ try: # Validate input if not refresh_data.refresh_token or not refresh_data.refresh_token.strip(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Refresh token is required" ) # Verify refresh token try: payload = user_service.verify_token(refresh_data.refresh_token, "refresh") if payload is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired refresh token", headers={"WWW-Authenticate": "Bearer"} ) except Exception as verify_error: logger.warning(f"Token verification failed: {verify_error}") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired refresh token", headers={"WWW-Authenticate": "Bearer"} ) user_id = payload.get("sub") username = payload.get("username") if not user_id: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token payload" ) # Get user to verify they still exist and are active try: user = await user_service.get_user_by_id(user_id) except Exception as db_error: logger.error(f"Database error fetching user {user_id}: {db_error}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to verify user status" ) if not user: logger.warning(f"Token refresh attempted for non-existent user: {user_id}") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found" ) if user.status.value != "active": logger.warning(f"Token refresh attempted for inactive user: {user_id}, status: {user.status.value}") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=f"User account is {user.status.value}" ) # Create new access token try: access_token_expires = timedelta(hours=settings.TOKEN_EXPIRATION_HOURS) new_access_token = user_service.create_access_token( data={"sub": user_id, "username": username, "role": user.role, "merchant_id": user.merchant_id, "merchant_type": user.merchant_type}, expires_delta=access_token_expires ) except Exception as token_error: logger.error(f"Error creating new access token: {token_error}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to generate new access token" ) logger.info(f"Token refreshed successfully for user: {username}") return { "access_token": new_access_token, "token_type": "bearer", "expires_in": settings.TOKEN_EXPIRATION_HOURS * 3600 } except HTTPException: raise except Exception as e: logger.error(f"Unexpected token refresh error: {str(e)}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An unexpected error occurred during token refresh" ) @router.get("/me") async def get_current_user_info( current_user: SystemUserModel = Depends(get_current_user) ): """ Get current user information. Raises: HTTPException: 401 - Unauthorized (invalid or missing token) """ try: return { "user_id": current_user.user_id, "username": current_user.username, "email": current_user.email, "full_name": current_user.full_name, "role": current_user.role, "permissions": current_user.permissions, "status": current_user.status.value, "last_login_at": current_user.last_login_at, "timezone": current_user.timezone, "language": current_user.language } except AttributeError as e: logger.error(f"Error accessing user attributes: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error retrieving user information" ) except Exception as e: logger.error(f"Unexpected error getting current user info: {str(e)}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An unexpected error occurred" ) @router.post("/logout") async def logout( request: Request, current_user: SystemUserModel = Depends(get_current_user), user_service: SystemUserService = Depends(get_system_user_service) ): """ Logout current user. Requires JWT token in Authorization header (Bearer token). Logs out the user and records the logout event for audit purposes. **Security:** - Validates JWT token before logout - Records logout event with IP address, user agent, and session duration - Stores audit log for compliance and security tracking **Note:** Since we're using stateless JWT tokens, the client is responsible for: - Removing the token from local storage/cookies - Clearing any cached user data - Redirecting to login page For enhanced security in production: - Consider implementing token blacklisting - Use short-lived access tokens with refresh tokens - Implement server-side session management if needed Raises: HTTPException: 401 - Unauthorized (invalid or missing token) HTTPException: 500 - Server error """ try: # Get client information for audit logging client_ip = request.client.host if request.client else None user_agent = request.headers.get("User-Agent") # Record logout for audit purposes await user_service.record_logout( user=current_user, ip_address=client_ip, user_agent=user_agent ) logger.info( f"User logged out successfully: {current_user.username}", extra={ "event": "logout_success", "user_id": current_user.user_id, "username": current_user.username, "ip_address": client_ip } ) return { "success": True, "message": "Successfully logged out" } except AttributeError as e: logger.error( f"Error accessing user during logout: {e}", extra={"error_type": "attribute_error"}, exc_info=True ) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error during logout" ) except Exception as e: logger.error( f"Unexpected logout error: {str(e)}", extra={ "error_type": type(e).__name__, "user_id": current_user.user_id if current_user else None }, exc_info=True ) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="An unexpected error occurred during logout" ) @router.post("/test-login") async def test_login(): """ Test endpoint to verify authentication system is working. Returns sample login credentials. """ return { "message": "Authentication system is ready", "test_credentials": [ { "type": "Super Admin", "email": "superadmin@cuatrolabs.com", "password": "SuperAdmin@123", "description": "Full system access" }, { "type": "Company Admin", "email": "admin@cuatrolabs.com", "password": "CompanyAdmin@123", "description": "Company-wide management" }, { "type": "Manager", "email": "manager@cuatrolabs.com", "password": "Manager@123", "description": "Team management" } ] } @router.get("/access-roles") async def get_access_roles( user_service: SystemUserService = Depends(get_system_user_service) ): """ Get available access roles and their permissions structure. Returns the complete role hierarchy with grouped permissions. Raises: HTTPException: 500 - Database or server error """ try: # Get roles from database roles = await user_service.get_all_roles() if roles is None: logger.warning("get_all_roles returned None") roles = [] return { "success": True, "message": "Access roles with grouped permissions structure", "total_roles": len(roles), "roles": [ { "role_id": role.get("role_id"), "role_name": role.get("role_name"), "description": role.get("description"), "permissions": role.get("permissions", {}), "is_active": role.get("is_active", True) } for role in roles ] } except Exception as e: logger.error(f"Error fetching access roles: {str(e)}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to fetch access roles" ) @router.get("/password-rotation-status") async def get_password_rotation_status( current_user: SystemUserModel = Depends(get_current_user), user_service: SystemUserService = Depends(get_system_user_service) ): """ Get current user's password rotation status. Returns password age, expiration information, and rotation requirements. **Response:** - **password_status**: active, warning, or expired - **password_age_days**: Days since last password change (-1 if never changed) - **password_rotation_days_required**: Number of days before rotation is required - **days_until_expiry**: Days remaining before password expires - **last_password_change**: Timestamp of last password change - **requires_password_change**: Whether user must change password immediately - **warning_threshold_days**: Number of days before warning is shown Raises: HTTPException: 401 - Unauthorized (invalid or missing token) HTTPException: 500 - Server error """ try: status_info = user_service.get_password_rotation_status(current_user) logger.info( f"Password rotation status retrieved for user: {current_user.username}", extra={ "user_id": current_user.user_id, "password_status": status_info["password_status"], "password_age_days": status_info["password_age_days"] } ) return { "success": True, "data": status_info } except Exception as e: logger.error(f"Error fetching password rotation status: {str(e)}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to fetch password rotation status" ) @router.post("/password-rotation-policy") async def get_password_rotation_policy( user_service: SystemUserService = Depends(get_system_user_service) ): """ Get password rotation policy information. This endpoint returns the current password rotation policy settings without requiring authentication. **Response:** - **password_rotation_days**: Days required before password rotation - **password_rotation_warning_days**: Days before expiry to show warning - **enforce_password_rotation**: Whether rotation is enforced - **allow_login_with_expired_password**: Whether users can login with expired password Raises: HTTPException: 500 - Server error """ try: policy = { "success": True, "policy": { "password_rotation_days": settings.PASSWORD_ROTATION_DAYS, "password_rotation_warning_days": settings.PASSWORD_ROTATION_WARNING_DAYS, "enforce_password_rotation": settings.ENFORCE_PASSWORD_ROTATION, "allow_login_with_expired_password": settings.ALLOW_LOGIN_WITH_EXPIRED_PASSWORD, "description": f"Users must change their password every {settings.PASSWORD_ROTATION_DAYS} days. " f"A warning is shown {settings.PASSWORD_ROTATION_WARNING_DAYS} days before expiration." } } logger.debug("Password rotation policy retrieved") return policy except Exception as e: logger.error(f"Error fetching password rotation policy: {str(e)}", exc_info=True) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to fetch password rotation policy" )