Spaces:
Running
Running
| """ | |
| Authentication router for login, logout, and token management endpoints. | |
| Provides JWT-based authentication with enhanced security features. | |
| """ | |
| from datetime import timedelta | |
| from typing import Optional, List, Dict | |
| from fastapi import APIRouter, Depends, HTTPException, status, Body, Request | |
| from pydantic import BaseModel, EmailStr | |
| from app.system_users.services.service import SystemUserService | |
| from app.dependencies.auth import get_system_user_service, get_current_user | |
| from app.system_users.models.model import SystemUserModel | |
| from app.core.config import settings | |
| from app.core.logging import get_logger | |
| # Customer auth imports moved to customer_router.py | |
| logger = get_logger(__name__) | |
| router = APIRouter(prefix="/auth", tags=["Authentication"]) | |
| def _get_accessible_widgets(user_role) -> List[Dict]: | |
| """Generate accessible widgets based on user role for authentication system.""" | |
| # Base widgets available to all roles - Authentication focused | |
| base_widgets = [ | |
| { | |
| "widget_id": "wid_login_count_001", | |
| "widget_type": "kpi", | |
| "title": "Login Count", | |
| "accessible": True | |
| }, | |
| { | |
| "widget_id": "wid_active_users_001", | |
| "widget_type": "kpi", | |
| "title": "Active Users", | |
| "accessible": True | |
| }, | |
| { | |
| "widget_id": "wid_failed_logins_001", | |
| "widget_type": "kpi", | |
| "title": "Failed Logins (24h)", | |
| "accessible": True | |
| }, | |
| { | |
| "widget_id": "wid_user_roles_001", | |
| "widget_type": "chart", | |
| "title": "User Roles Distribution", | |
| "accessible": True | |
| }, | |
| { | |
| "widget_id": "wid_login_trend_001", | |
| "widget_type": "chart", | |
| "title": "Login Trend (7 days)", | |
| "accessible": True | |
| } | |
| ] | |
| # Advanced widgets for managers and above | |
| advanced_widgets = [ | |
| { | |
| "widget_id": "wid_security_events_001", | |
| "widget_type": "table", | |
| "title": "Recent Security Events", | |
| "accessible": True | |
| }, | |
| { | |
| "widget_id": "wid_locked_accounts_001", | |
| "widget_type": "table", | |
| "title": "Locked Accounts", | |
| "accessible": True | |
| }, | |
| { | |
| "widget_id": "wid_recent_registrations_001", | |
| "widget_type": "table", | |
| "title": "Recent User Registrations", | |
| "accessible": True | |
| } | |
| ] | |
| # Return widgets based on role | |
| if user_role in ["super_admin", "admin", "role_super_admin", "role_company_admin"]: | |
| return base_widgets + advanced_widgets | |
| elif user_role in ["manager", "role_cnf_manager"]: | |
| return base_widgets + advanced_widgets[:2] # Limited advanced widgets | |
| else: | |
| return base_widgets # Basic widgets only | |
| class LoginRequest(BaseModel): | |
| """Login request model.""" | |
| email_or_phone: str # Can be email, phone number, or username | |
| password: str | |
| class LoginResponse(BaseModel): | |
| """Login response model.""" | |
| access_token: str | |
| refresh_token: str | |
| token_type: str = "bearer" | |
| expires_in: int | |
| user: dict | |
| access_menu: dict | |
| warnings: Optional[str] = None | |
| class TokenRefreshRequest(BaseModel): | |
| """Token refresh request.""" | |
| refresh_token: str | |
| async def login( | |
| request: Request, | |
| login_data: LoginRequest, | |
| user_service: SystemUserService = Depends(get_system_user_service) | |
| ): | |
| """ | |
| Authenticate user and return JWT tokens. | |
| - **email_or_phone**: User email, phone number, or username | |
| - **password**: User password | |
| Raises: | |
| HTTPException: 401 - Invalid credentials or account locked | |
| HTTPException: 500 - Database or server error | |
| """ | |
| try: | |
| logger.info(f"Login attempt for: {login_data.email_or_phone}") | |
| # Validate input | |
| if not login_data.email_or_phone or not login_data.email_or_phone.strip(): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Email, phone, or username is required" | |
| ) | |
| if not login_data.password or not login_data.password.strip(): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Password is required" | |
| ) | |
| # Get client IP and user agent for security tracking | |
| client_ip = request.client.host if request.client else None | |
| user_agent = request.headers.get("User-Agent") | |
| # Authenticate user | |
| user, message = await user_service.authenticate_user( | |
| login_data.email_or_phone, | |
| login_data.password, | |
| ip_address=client_ip, | |
| user_agent=user_agent | |
| ) | |
| if not user: | |
| logger.warning(f"Login failed for {login_data.email_or_phone}: {message}") | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail=message, | |
| headers={"WWW-Authenticate": "Bearer"} | |
| ) | |
| logger.info(f"User authenticated: {user.username}, role: {user.role}") | |
| # Fetch permissions from SCM access roles collection based on user role | |
| try: | |
| scm_permissions = await user_service.get_scm_permissions_by_role(user.role) | |
| if scm_permissions: | |
| logger.info(f"SCM permissions loaded: {list(scm_permissions.keys())}") | |
| else: | |
| logger.warning(f"No SCM permissions found for role: {user.role}") | |
| except Exception as perm_error: | |
| logger.error(f"Error fetching permissions: {perm_error}") | |
| scm_permissions = None | |
| # Create tokens | |
| try: | |
| access_token_expires = timedelta(hours=settings.TOKEN_EXPIRATION_HOURS) | |
| access_token = user_service.create_access_token( | |
| data={"sub": user.user_id, "username": user.username, "role": user.role, "merchant_id": user.merchant_id, "merchant_type": user.merchant_type}, | |
| expires_delta=access_token_expires | |
| ) | |
| refresh_token = user_service.create_refresh_token( | |
| data={"sub": user.user_id, "username": user.username} | |
| ) | |
| except Exception as token_error: | |
| logger.error(f"Error creating tokens: {token_error}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to generate authentication tokens" | |
| ) | |
| # Generate accessible widgets based on user role | |
| accessible_widgets = _get_accessible_widgets(user.role) | |
| # Return user info without sensitive data | |
| user_info = { | |
| "user_id": user.user_id, | |
| "username": user.username, | |
| "email": user.email, | |
| "merchant_id": user.merchant_id, | |
| "merchant_type": user.merchant_type, | |
| "full_name": user.full_name, | |
| "role": user.role, | |
| "status": user.status.value, | |
| "last_login_at": user.last_login_at | |
| } | |
| # Access menu structure with SCM permissions | |
| access_menu = { | |
| "permissions": scm_permissions, | |
| "accessible_widgets": accessible_widgets | |
| } | |
| # Check password rotation policy | |
| password_rotation_status = user_service.get_password_rotation_status(user) | |
| warning_message = None | |
| if password_rotation_status["password_status"] == "expired": | |
| warning_message = f"Your password has expired. Please change your password immediately. Your password is {password_rotation_status['password_age_days']} days old." | |
| logger.warning( | |
| f"Password rotation required for user: {user.username}", | |
| extra={ | |
| "event": "password_rotation_required", | |
| "user_id": user.user_id, | |
| "password_age_days": password_rotation_status["password_age_days"] | |
| } | |
| ) | |
| elif password_rotation_status["password_status"] == "warning": | |
| days_until_expiry = password_rotation_status["days_until_expiry"] | |
| warning_message = f"Your password will expire in {days_until_expiry} day(s). Please change your password soon." | |
| logger.info( | |
| f"Password rotation warning for user: {user.username}", | |
| extra={ | |
| "event": "password_rotation_warning", | |
| "user_id": user.user_id, | |
| "days_until_expiry": days_until_expiry | |
| } | |
| ) | |
| logger.info(f"✅ User logged in successfully: {user.username}") | |
| return LoginResponse( | |
| access_token=access_token, | |
| refresh_token=refresh_token, | |
| expires_in=int(access_token_expires.total_seconds()), | |
| user=user_info, | |
| access_menu=access_menu, | |
| warnings=warning_message | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Unexpected login error for {login_data.email_or_phone}: {str(e)}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="An unexpected error occurred during authentication" | |
| ) | |
| class OAuth2LoginRequest(BaseModel): | |
| """OAuth2 compatible login request.""" | |
| username: str # Can be email or phone | |
| password: str | |
| grant_type: str = "password" | |
| async def refresh_token( | |
| refresh_data: TokenRefreshRequest, | |
| user_service: SystemUserService = Depends(get_system_user_service) | |
| ): | |
| """ | |
| Refresh access token using refresh token. | |
| Raises: | |
| HTTPException: 400 - Missing or invalid refresh token | |
| HTTPException: 401 - Token expired or user inactive | |
| HTTPException: 500 - Server error | |
| """ | |
| try: | |
| # Validate input | |
| if not refresh_data.refresh_token or not refresh_data.refresh_token.strip(): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Refresh token is required" | |
| ) | |
| # Verify refresh token | |
| try: | |
| payload = user_service.verify_token(refresh_data.refresh_token, "refresh") | |
| if payload is None: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid or expired refresh token", | |
| headers={"WWW-Authenticate": "Bearer"} | |
| ) | |
| except Exception as verify_error: | |
| logger.warning(f"Token verification failed: {verify_error}") | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid or expired refresh token", | |
| headers={"WWW-Authenticate": "Bearer"} | |
| ) | |
| user_id = payload.get("sub") | |
| username = payload.get("username") | |
| if not user_id: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid token payload" | |
| ) | |
| # Get user to verify they still exist and are active | |
| try: | |
| user = await user_service.get_user_by_id(user_id) | |
| except Exception as db_error: | |
| logger.error(f"Database error fetching user {user_id}: {db_error}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to verify user status" | |
| ) | |
| if not user: | |
| logger.warning(f"Token refresh attempted for non-existent user: {user_id}") | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="User not found" | |
| ) | |
| if user.status.value != "active": | |
| logger.warning(f"Token refresh attempted for inactive user: {user_id}, status: {user.status.value}") | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail=f"User account is {user.status.value}" | |
| ) | |
| # Create new access token | |
| try: | |
| access_token_expires = timedelta(hours=settings.TOKEN_EXPIRATION_HOURS) | |
| new_access_token = user_service.create_access_token( | |
| data={"sub": user_id, "username": username, "role": user.role, "merchant_id": user.merchant_id, "merchant_type": user.merchant_type}, | |
| expires_delta=access_token_expires | |
| ) | |
| except Exception as token_error: | |
| logger.error(f"Error creating new access token: {token_error}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to generate new access token" | |
| ) | |
| logger.info(f"Token refreshed successfully for user: {username}") | |
| return { | |
| "access_token": new_access_token, | |
| "token_type": "bearer", | |
| "expires_in": settings.TOKEN_EXPIRATION_HOURS * 3600 | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Unexpected token refresh error: {str(e)}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="An unexpected error occurred during token refresh" | |
| ) | |
| async def get_current_user_info( | |
| current_user: SystemUserModel = Depends(get_current_user) | |
| ): | |
| """ | |
| Get current user information. | |
| Raises: | |
| HTTPException: 401 - Unauthorized (invalid or missing token) | |
| """ | |
| try: | |
| return { | |
| "user_id": current_user.user_id, | |
| "username": current_user.username, | |
| "email": current_user.email, | |
| "full_name": current_user.full_name, | |
| "role": current_user.role, | |
| "permissions": current_user.permissions, | |
| "status": current_user.status.value, | |
| "last_login_at": current_user.last_login_at, | |
| "timezone": current_user.timezone, | |
| "language": current_user.language | |
| } | |
| except AttributeError as e: | |
| logger.error(f"Error accessing user attributes: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Error retrieving user information" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Unexpected error getting current user info: {str(e)}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="An unexpected error occurred" | |
| ) | |
| async def logout( | |
| request: Request, | |
| current_user: SystemUserModel = Depends(get_current_user), | |
| user_service: SystemUserService = Depends(get_system_user_service) | |
| ): | |
| """ | |
| Logout current user. | |
| Requires JWT token in Authorization header (Bearer token). | |
| Logs out the user and records the logout event for audit purposes. | |
| **Security:** | |
| - Validates JWT token before logout | |
| - Records logout event with IP address, user agent, and session duration | |
| - Stores audit log for compliance and security tracking | |
| **Note:** Since we're using stateless JWT tokens, the client is responsible for: | |
| - Removing the token from local storage/cookies | |
| - Clearing any cached user data | |
| - Redirecting to login page | |
| For enhanced security in production: | |
| - Consider implementing token blacklisting | |
| - Use short-lived access tokens with refresh tokens | |
| - Implement server-side session management if needed | |
| Raises: | |
| HTTPException: 401 - Unauthorized (invalid or missing token) | |
| HTTPException: 500 - Server error | |
| """ | |
| try: | |
| # Get client information for audit logging | |
| client_ip = request.client.host if request.client else None | |
| user_agent = request.headers.get("User-Agent") | |
| # Record logout for audit purposes | |
| await user_service.record_logout( | |
| user=current_user, | |
| ip_address=client_ip, | |
| user_agent=user_agent | |
| ) | |
| logger.info( | |
| f"User logged out successfully: {current_user.username}", | |
| extra={ | |
| "event": "logout_success", | |
| "user_id": current_user.user_id, | |
| "username": current_user.username, | |
| "ip_address": client_ip | |
| } | |
| ) | |
| return { | |
| "success": True, | |
| "message": "Successfully logged out" | |
| } | |
| except AttributeError as e: | |
| logger.error( | |
| f"Error accessing user during logout: {e}", | |
| extra={"error_type": "attribute_error"}, | |
| exc_info=True | |
| ) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Error during logout" | |
| ) | |
| except Exception as e: | |
| logger.error( | |
| f"Unexpected logout error: {str(e)}", | |
| extra={ | |
| "error_type": type(e).__name__, | |
| "user_id": current_user.user_id if current_user else None | |
| }, | |
| exc_info=True | |
| ) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="An unexpected error occurred during logout" | |
| ) | |
| async def test_login(): | |
| """ | |
| Test endpoint to verify authentication system is working. | |
| Returns sample login credentials. | |
| """ | |
| return { | |
| "message": "Authentication system is ready", | |
| "test_credentials": [ | |
| { | |
| "type": "Super Admin", | |
| "email": "superadmin@cuatrolabs.com", | |
| "password": "SuperAdmin@123", | |
| "description": "Full system access" | |
| }, | |
| { | |
| "type": "Company Admin", | |
| "email": "admin@cuatrolabs.com", | |
| "password": "CompanyAdmin@123", | |
| "description": "Company-wide management" | |
| }, | |
| { | |
| "type": "Manager", | |
| "email": "manager@cuatrolabs.com", | |
| "password": "Manager@123", | |
| "description": "Team management" | |
| } | |
| ] | |
| } | |
| async def get_access_roles( | |
| user_service: SystemUserService = Depends(get_system_user_service) | |
| ): | |
| """ | |
| Get available access roles and their permissions structure. | |
| Returns the complete role hierarchy with grouped permissions. | |
| Raises: | |
| HTTPException: 500 - Database or server error | |
| """ | |
| try: | |
| # Get roles from database | |
| roles = await user_service.get_all_roles() | |
| if roles is None: | |
| logger.warning("get_all_roles returned None") | |
| roles = [] | |
| return { | |
| "success": True, | |
| "message": "Access roles with grouped permissions structure", | |
| "total_roles": len(roles), | |
| "roles": [ | |
| { | |
| "role_id": role.get("role_id"), | |
| "role_name": role.get("role_name"), | |
| "description": role.get("description"), | |
| "permissions": role.get("permissions", {}), | |
| "is_active": role.get("is_active", True) | |
| } | |
| for role in roles | |
| ] | |
| } | |
| except Exception as e: | |
| logger.error(f"Error fetching access roles: {str(e)}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to fetch access roles" | |
| ) | |
| async def get_password_rotation_status( | |
| current_user: SystemUserModel = Depends(get_current_user), | |
| user_service: SystemUserService = Depends(get_system_user_service) | |
| ): | |
| """ | |
| Get current user's password rotation status. | |
| Returns password age, expiration information, and rotation requirements. | |
| **Response:** | |
| - **password_status**: active, warning, or expired | |
| - **password_age_days**: Days since last password change (-1 if never changed) | |
| - **password_rotation_days_required**: Number of days before rotation is required | |
| - **days_until_expiry**: Days remaining before password expires | |
| - **last_password_change**: Timestamp of last password change | |
| - **requires_password_change**: Whether user must change password immediately | |
| - **warning_threshold_days**: Number of days before warning is shown | |
| Raises: | |
| HTTPException: 401 - Unauthorized (invalid or missing token) | |
| HTTPException: 500 - Server error | |
| """ | |
| try: | |
| status_info = user_service.get_password_rotation_status(current_user) | |
| logger.info( | |
| f"Password rotation status retrieved for user: {current_user.username}", | |
| extra={ | |
| "user_id": current_user.user_id, | |
| "password_status": status_info["password_status"], | |
| "password_age_days": status_info["password_age_days"] | |
| } | |
| ) | |
| return { | |
| "success": True, | |
| "data": status_info | |
| } | |
| except Exception as e: | |
| logger.error(f"Error fetching password rotation status: {str(e)}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to fetch password rotation status" | |
| ) | |
| async def get_password_rotation_policy( | |
| user_service: SystemUserService = Depends(get_system_user_service) | |
| ): | |
| """ | |
| Get password rotation policy information. | |
| This endpoint returns the current password rotation policy settings | |
| without requiring authentication. | |
| **Response:** | |
| - **password_rotation_days**: Days required before password rotation | |
| - **password_rotation_warning_days**: Days before expiry to show warning | |
| - **enforce_password_rotation**: Whether rotation is enforced | |
| - **allow_login_with_expired_password**: Whether users can login with expired password | |
| Raises: | |
| HTTPException: 500 - Server error | |
| """ | |
| try: | |
| policy = { | |
| "success": True, | |
| "policy": { | |
| "password_rotation_days": settings.PASSWORD_ROTATION_DAYS, | |
| "password_rotation_warning_days": settings.PASSWORD_ROTATION_WARNING_DAYS, | |
| "enforce_password_rotation": settings.ENFORCE_PASSWORD_ROTATION, | |
| "allow_login_with_expired_password": settings.ALLOW_LOGIN_WITH_EXPIRED_PASSWORD, | |
| "description": f"Users must change their password every {settings.PASSWORD_ROTATION_DAYS} days. " | |
| f"A warning is shown {settings.PASSWORD_ROTATION_WARNING_DAYS} days before expiration." | |
| } | |
| } | |
| logger.debug("Password rotation policy retrieved") | |
| return policy | |
| except Exception as e: | |
| logger.error(f"Error fetching password rotation policy: {str(e)}", exc_info=True) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to fetch password rotation policy" | |
| ) |