MukeshKapoor25's picture
updates
e032470
"""
Authentication router for login, logout, and token management endpoints.
Provides JWT-based authentication with enhanced security features.
"""
from datetime import timedelta
from typing import Optional, List, Dict
from fastapi import APIRouter, Depends, HTTPException, status, Body, Request
from pydantic import BaseModel, EmailStr
from app.system_users.services.service import SystemUserService
from app.dependencies.auth import get_system_user_service, get_current_user
from app.system_users.models.model import SystemUserModel
from app.core.config import settings
from app.core.logging import get_logger
# Customer auth imports moved to customer_router.py
logger = get_logger(__name__)
router = APIRouter(prefix="/auth", tags=["Authentication"])
def _get_accessible_widgets(user_role) -> List[Dict]:
"""Generate accessible widgets based on user role for authentication system."""
# Base widgets available to all roles - Authentication focused
base_widgets = [
{
"widget_id": "wid_login_count_001",
"widget_type": "kpi",
"title": "Login Count",
"accessible": True
},
{
"widget_id": "wid_active_users_001",
"widget_type": "kpi",
"title": "Active Users",
"accessible": True
},
{
"widget_id": "wid_failed_logins_001",
"widget_type": "kpi",
"title": "Failed Logins (24h)",
"accessible": True
},
{
"widget_id": "wid_user_roles_001",
"widget_type": "chart",
"title": "User Roles Distribution",
"accessible": True
},
{
"widget_id": "wid_login_trend_001",
"widget_type": "chart",
"title": "Login Trend (7 days)",
"accessible": True
}
]
# Advanced widgets for managers and above
advanced_widgets = [
{
"widget_id": "wid_security_events_001",
"widget_type": "table",
"title": "Recent Security Events",
"accessible": True
},
{
"widget_id": "wid_locked_accounts_001",
"widget_type": "table",
"title": "Locked Accounts",
"accessible": True
},
{
"widget_id": "wid_recent_registrations_001",
"widget_type": "table",
"title": "Recent User Registrations",
"accessible": True
}
]
# Return widgets based on role
if user_role in ["super_admin", "admin", "role_super_admin", "role_company_admin"]:
return base_widgets + advanced_widgets
elif user_role in ["manager", "role_cnf_manager"]:
return base_widgets + advanced_widgets[:2] # Limited advanced widgets
else:
return base_widgets # Basic widgets only
class LoginRequest(BaseModel):
"""Login request model."""
email_or_phone: str # Can be email, phone number, or username
password: str
class LoginResponse(BaseModel):
"""Login response model."""
access_token: str
refresh_token: str
token_type: str = "bearer"
expires_in: int
user: dict
access_menu: dict
warnings: Optional[str] = None
class TokenRefreshRequest(BaseModel):
"""Token refresh request."""
refresh_token: str
@router.post("/login", response_model=LoginResponse)
async def login(
request: Request,
login_data: LoginRequest,
user_service: SystemUserService = Depends(get_system_user_service)
):
"""
Authenticate user and return JWT tokens.
- **email_or_phone**: User email, phone number, or username
- **password**: User password
Raises:
HTTPException: 401 - Invalid credentials or account locked
HTTPException: 500 - Database or server error
"""
try:
logger.info(f"Login attempt for: {login_data.email_or_phone}")
# Validate input
if not login_data.email_or_phone or not login_data.email_or_phone.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email, phone, or username is required"
)
if not login_data.password or not login_data.password.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Password is required"
)
# Get client IP and user agent for security tracking
client_ip = request.client.host if request.client else None
user_agent = request.headers.get("User-Agent")
# Authenticate user
user, message = await user_service.authenticate_user(
login_data.email_or_phone,
login_data.password,
ip_address=client_ip,
user_agent=user_agent
)
if not user:
logger.warning(f"Login failed for {login_data.email_or_phone}: {message}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=message,
headers={"WWW-Authenticate": "Bearer"}
)
logger.info(f"User authenticated: {user.username}, role: {user.role}")
# Fetch permissions from SCM access roles collection based on user role
try:
scm_permissions = await user_service.get_scm_permissions_by_role(user.role)
if scm_permissions:
logger.info(f"SCM permissions loaded: {list(scm_permissions.keys())}")
else:
logger.warning(f"No SCM permissions found for role: {user.role}")
except Exception as perm_error:
logger.error(f"Error fetching permissions: {perm_error}")
scm_permissions = None
# Create tokens
try:
access_token_expires = timedelta(hours=settings.TOKEN_EXPIRATION_HOURS)
access_token = user_service.create_access_token(
data={"sub": user.user_id, "username": user.username, "role": user.role, "merchant_id": user.merchant_id, "merchant_type": user.merchant_type},
expires_delta=access_token_expires
)
refresh_token = user_service.create_refresh_token(
data={"sub": user.user_id, "username": user.username}
)
except Exception as token_error:
logger.error(f"Error creating tokens: {token_error}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to generate authentication tokens"
)
# Generate accessible widgets based on user role
accessible_widgets = _get_accessible_widgets(user.role)
# Return user info without sensitive data
user_info = {
"user_id": user.user_id,
"username": user.username,
"email": user.email,
"merchant_id": user.merchant_id,
"merchant_type": user.merchant_type,
"full_name": user.full_name,
"role": user.role,
"status": user.status.value,
"last_login_at": user.last_login_at
}
# Access menu structure with SCM permissions
access_menu = {
"permissions": scm_permissions,
"accessible_widgets": accessible_widgets
}
# Check password rotation policy
password_rotation_status = user_service.get_password_rotation_status(user)
warning_message = None
if password_rotation_status["password_status"] == "expired":
warning_message = f"Your password has expired. Please change your password immediately. Your password is {password_rotation_status['password_age_days']} days old."
logger.warning(
f"Password rotation required for user: {user.username}",
extra={
"event": "password_rotation_required",
"user_id": user.user_id,
"password_age_days": password_rotation_status["password_age_days"]
}
)
elif password_rotation_status["password_status"] == "warning":
days_until_expiry = password_rotation_status["days_until_expiry"]
warning_message = f"Your password will expire in {days_until_expiry} day(s). Please change your password soon."
logger.info(
f"Password rotation warning for user: {user.username}",
extra={
"event": "password_rotation_warning",
"user_id": user.user_id,
"days_until_expiry": days_until_expiry
}
)
logger.info(f"✅ User logged in successfully: {user.username}")
return LoginResponse(
access_token=access_token,
refresh_token=refresh_token,
expires_in=int(access_token_expires.total_seconds()),
user=user_info,
access_menu=access_menu,
warnings=warning_message
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Unexpected login error for {login_data.email_or_phone}: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="An unexpected error occurred during authentication"
)
class OAuth2LoginRequest(BaseModel):
"""OAuth2 compatible login request."""
username: str # Can be email or phone
password: str
grant_type: str = "password"
@router.post("/refresh")
async def refresh_token(
refresh_data: TokenRefreshRequest,
user_service: SystemUserService = Depends(get_system_user_service)
):
"""
Refresh access token using refresh token.
Raises:
HTTPException: 400 - Missing or invalid refresh token
HTTPException: 401 - Token expired or user inactive
HTTPException: 500 - Server error
"""
try:
# Validate input
if not refresh_data.refresh_token or not refresh_data.refresh_token.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Refresh token is required"
)
# Verify refresh token
try:
payload = user_service.verify_token(refresh_data.refresh_token, "refresh")
if payload is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired refresh token",
headers={"WWW-Authenticate": "Bearer"}
)
except Exception as verify_error:
logger.warning(f"Token verification failed: {verify_error}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired refresh token",
headers={"WWW-Authenticate": "Bearer"}
)
user_id = payload.get("sub")
username = payload.get("username")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token payload"
)
# Get user to verify they still exist and are active
try:
user = await user_service.get_user_by_id(user_id)
except Exception as db_error:
logger.error(f"Database error fetching user {user_id}: {db_error}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to verify user status"
)
if not user:
logger.warning(f"Token refresh attempted for non-existent user: {user_id}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found"
)
if user.status.value != "active":
logger.warning(f"Token refresh attempted for inactive user: {user_id}, status: {user.status.value}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"User account is {user.status.value}"
)
# Create new access token
try:
access_token_expires = timedelta(hours=settings.TOKEN_EXPIRATION_HOURS)
new_access_token = user_service.create_access_token(
data={"sub": user_id, "username": username, "role": user.role, "merchant_id": user.merchant_id, "merchant_type": user.merchant_type},
expires_delta=access_token_expires
)
except Exception as token_error:
logger.error(f"Error creating new access token: {token_error}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to generate new access token"
)
logger.info(f"Token refreshed successfully for user: {username}")
return {
"access_token": new_access_token,
"token_type": "bearer",
"expires_in": settings.TOKEN_EXPIRATION_HOURS * 3600
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Unexpected token refresh error: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="An unexpected error occurred during token refresh"
)
@router.get("/me")
async def get_current_user_info(
current_user: SystemUserModel = Depends(get_current_user)
):
"""
Get current user information.
Raises:
HTTPException: 401 - Unauthorized (invalid or missing token)
"""
try:
return {
"user_id": current_user.user_id,
"username": current_user.username,
"email": current_user.email,
"full_name": current_user.full_name,
"role": current_user.role,
"permissions": current_user.permissions,
"status": current_user.status.value,
"last_login_at": current_user.last_login_at,
"timezone": current_user.timezone,
"language": current_user.language
}
except AttributeError as e:
logger.error(f"Error accessing user attributes: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error retrieving user information"
)
except Exception as e:
logger.error(f"Unexpected error getting current user info: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="An unexpected error occurred"
)
@router.post("/logout")
async def logout(
request: Request,
current_user: SystemUserModel = Depends(get_current_user),
user_service: SystemUserService = Depends(get_system_user_service)
):
"""
Logout current user.
Requires JWT token in Authorization header (Bearer token).
Logs out the user and records the logout event for audit purposes.
**Security:**
- Validates JWT token before logout
- Records logout event with IP address, user agent, and session duration
- Stores audit log for compliance and security tracking
**Note:** Since we're using stateless JWT tokens, the client is responsible for:
- Removing the token from local storage/cookies
- Clearing any cached user data
- Redirecting to login page
For enhanced security in production:
- Consider implementing token blacklisting
- Use short-lived access tokens with refresh tokens
- Implement server-side session management if needed
Raises:
HTTPException: 401 - Unauthorized (invalid or missing token)
HTTPException: 500 - Server error
"""
try:
# Get client information for audit logging
client_ip = request.client.host if request.client else None
user_agent = request.headers.get("User-Agent")
# Record logout for audit purposes
await user_service.record_logout(
user=current_user,
ip_address=client_ip,
user_agent=user_agent
)
logger.info(
f"User logged out successfully: {current_user.username}",
extra={
"event": "logout_success",
"user_id": current_user.user_id,
"username": current_user.username,
"ip_address": client_ip
}
)
return {
"success": True,
"message": "Successfully logged out"
}
except AttributeError as e:
logger.error(
f"Error accessing user during logout: {e}",
extra={"error_type": "attribute_error"},
exc_info=True
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error during logout"
)
except Exception as e:
logger.error(
f"Unexpected logout error: {str(e)}",
extra={
"error_type": type(e).__name__,
"user_id": current_user.user_id if current_user else None
},
exc_info=True
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="An unexpected error occurred during logout"
)
@router.post("/test-login")
async def test_login():
"""
Test endpoint to verify authentication system is working.
Returns sample login credentials.
"""
return {
"message": "Authentication system is ready",
"test_credentials": [
{
"type": "Super Admin",
"email": "superadmin@cuatrolabs.com",
"password": "SuperAdmin@123",
"description": "Full system access"
},
{
"type": "Company Admin",
"email": "admin@cuatrolabs.com",
"password": "CompanyAdmin@123",
"description": "Company-wide management"
},
{
"type": "Manager",
"email": "manager@cuatrolabs.com",
"password": "Manager@123",
"description": "Team management"
}
]
}
@router.get("/access-roles")
async def get_access_roles(
user_service: SystemUserService = Depends(get_system_user_service)
):
"""
Get available access roles and their permissions structure.
Returns the complete role hierarchy with grouped permissions.
Raises:
HTTPException: 500 - Database or server error
"""
try:
# Get roles from database
roles = await user_service.get_all_roles()
if roles is None:
logger.warning("get_all_roles returned None")
roles = []
return {
"success": True,
"message": "Access roles with grouped permissions structure",
"total_roles": len(roles),
"roles": [
{
"role_id": role.get("role_id"),
"role_name": role.get("role_name"),
"description": role.get("description"),
"permissions": role.get("permissions", {}),
"is_active": role.get("is_active", True)
}
for role in roles
]
}
except Exception as e:
logger.error(f"Error fetching access roles: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to fetch access roles"
)
@router.get("/password-rotation-status")
async def get_password_rotation_status(
current_user: SystemUserModel = Depends(get_current_user),
user_service: SystemUserService = Depends(get_system_user_service)
):
"""
Get current user's password rotation status.
Returns password age, expiration information, and rotation requirements.
**Response:**
- **password_status**: active, warning, or expired
- **password_age_days**: Days since last password change (-1 if never changed)
- **password_rotation_days_required**: Number of days before rotation is required
- **days_until_expiry**: Days remaining before password expires
- **last_password_change**: Timestamp of last password change
- **requires_password_change**: Whether user must change password immediately
- **warning_threshold_days**: Number of days before warning is shown
Raises:
HTTPException: 401 - Unauthorized (invalid or missing token)
HTTPException: 500 - Server error
"""
try:
status_info = user_service.get_password_rotation_status(current_user)
logger.info(
f"Password rotation status retrieved for user: {current_user.username}",
extra={
"user_id": current_user.user_id,
"password_status": status_info["password_status"],
"password_age_days": status_info["password_age_days"]
}
)
return {
"success": True,
"data": status_info
}
except Exception as e:
logger.error(f"Error fetching password rotation status: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to fetch password rotation status"
)
@router.post("/password-rotation-policy")
async def get_password_rotation_policy(
user_service: SystemUserService = Depends(get_system_user_service)
):
"""
Get password rotation policy information.
This endpoint returns the current password rotation policy settings
without requiring authentication.
**Response:**
- **password_rotation_days**: Days required before password rotation
- **password_rotation_warning_days**: Days before expiry to show warning
- **enforce_password_rotation**: Whether rotation is enforced
- **allow_login_with_expired_password**: Whether users can login with expired password
Raises:
HTTPException: 500 - Server error
"""
try:
policy = {
"success": True,
"policy": {
"password_rotation_days": settings.PASSWORD_ROTATION_DAYS,
"password_rotation_warning_days": settings.PASSWORD_ROTATION_WARNING_DAYS,
"enforce_password_rotation": settings.ENFORCE_PASSWORD_ROTATION,
"allow_login_with_expired_password": settings.ALLOW_LOGIN_WITH_EXPIRED_PASSWORD,
"description": f"Users must change their password every {settings.PASSWORD_ROTATION_DAYS} days. "
f"A warning is shown {settings.PASSWORD_ROTATION_WARNING_DAYS} days before expiration."
}
}
logger.debug("Password rotation policy retrieved")
return policy
except Exception as e:
logger.error(f"Error fetching password rotation policy: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to fetch password rotation policy"
)