MukeshKapoor25's picture
role id fix
b81506c
"""
Authentication dependencies for FastAPI.
"""
from typing import Optional
from datetime import datetime
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from app.system_users.models.model import SystemUserModel, UserRole
from app.system_users.services.service import SystemUserService
from app.nosql import get_database
from app.core.logging import get_logger
logger = get_logger(__name__)
security = HTTPBearer()
def get_system_user_service() -> SystemUserService:
"""
Dependency to get SystemUserService instance.
Returns:
SystemUserService: Service instance with database connection
Raises:
HTTPException: 503 - Database connection not available
"""
try:
# get_database() returns AsyncIOMotorDatabase directly, no await needed
db = get_database()
if db is None:
logger.error(
"Database connection is None",
extra={"service": "system_user_service"}
)
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Database service unavailable"
)
return SystemUserService(db)
except HTTPException:
raise
except Exception as e:
logger.error(
"Error getting system user service",
extra={
"error": str(e),
"error_type": type(e).__name__
},
exc_info=True
)
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Authentication service unavailable"
)
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
user_service: SystemUserService = Depends(get_system_user_service)
) -> SystemUserModel:
"""
Get current authenticated user from JWT token.
Args:
credentials: HTTP Bearer token credentials
user_service: System user service instance
Returns:
SystemUserModel: The authenticated user
Raises:
HTTPException: 401 - Invalid or missing credentials
HTTPException: 403 - User account not active
HTTPException: 500 - Database or server error
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# Validate credentials
if not credentials or not credentials.credentials:
logger.warning(
"Missing authentication credentials",
extra={"error_type": "missing_credentials"}
)
raise credentials_exception
# Verify token
try:
payload = user_service.verify_token(credentials.credentials, "access")
except Exception as token_error:
logger.warning(
"Token verification failed",
extra={
"error": str(token_error),
"error_type": "token_verification_failed"
}
)
raise credentials_exception
if payload is None:
logger.warning(
"Token verification returned None",
extra={"error_type": "invalid_token"}
)
raise credentials_exception
user_id: str = payload.get("sub")
if user_id is None:
logger.warning(
"Token payload missing 'sub' claim",
extra={"error_type": "missing_sub_claim"}
)
raise credentials_exception
except HTTPException:
raise
except Exception as e:
logger.error(
"Unexpected error validating credentials",
extra={
"error": str(e),
"error_type": type(e).__name__
},
exc_info=True
)
raise credentials_exception
# Get user from database
try:
user = await user_service.get_user_by_id(user_id)
except Exception as db_error:
logger.error(
"Database error fetching user",
extra={
"user_id": user_id,
"error": str(db_error),
"error_type": type(db_error).__name__
},
exc_info=True
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to authenticate user"
)
if user is None:
logger.warning(
"User not found",
extra={
"user_id": user_id,
"error_type": "user_not_found"
}
)
raise credentials_exception
# Check if user is active
if user.status.value != "active":
logger.warning(
"Inactive user attempted access",
extra={
"user_id": user_id,
"status": user.status.value,
"error_type": "inactive_user"
}
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"User account is {user.status.value}"
)
return user
async def get_current_active_user(
current_user: SystemUserModel = Depends(get_current_user)
) -> SystemUserModel:
"""Get current active user (alias for get_current_user for clarity)."""
return current_user
async def require_admin_role(
current_user: SystemUserModel = Depends(get_current_user)
) -> SystemUserModel:
"""
Require admin or super_admin role.
Args:
current_user: The authenticated user
Returns:
SystemUserModel: The authenticated admin user
Raises:
HTTPException: 403 - Insufficient privileges
"""
admin_roles = ["admin", "super_admin", "role_super_admin", "role_company_admin"]
if current_user.role_id not in admin_roles:
logger.warning(
"User attempted admin action without privileges",
extra={
"user_id": str(current_user.user_id),
"username": current_user.username,
"user_role": current_user.role_id,
"error_type": "insufficient_privileges"
}
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin privileges required"
)
return current_user
async def require_super_admin_role(
current_user: SystemUserModel = Depends(get_current_user)
) -> SystemUserModel:
"""
Require super_admin role.
Args:
current_user: The authenticated user
Returns:
SystemUserModel: The authenticated super admin user
Raises:
HTTPException: 403 - Insufficient privileges
"""
super_admin_roles = ["super_admin", "role_super_admin"]
if current_user.role_id not in super_admin_roles:
logger.warning(
"User attempted super admin action without privileges",
extra={
"user_id": str(current_user.id),
"username": current_user.username,
"user_role": current_user.role_id,
"error_type": "insufficient_privileges"
}
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Super admin privileges required"
)
return current_user
def require_permission(permission: str):
"""
Dependency factory to require specific permission.
Args:
permission: The required permission string
Returns:
Callable: Dependency function that checks for the permission
"""
async def permission_checker(
current_user: SystemUserModel = Depends(get_current_user)
) -> SystemUserModel:
# Super admins and admins have all permissions
if current_user.role_id in ["admin", "super_admin", "role_super_admin", "role_company_admin"]:
return current_user
# Check if user has the specific permission
if permission not in current_user.permissions:
logger.warning(
"User lacks required permission",
extra={
"user_id": str(current_user.id),
"username": current_user.username,
"required_permission": permission,
"user_role": current_user.role_id,
"error_type": "permission_denied"
}
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permission '{permission}' required"
)
return current_user
return permission_checker
async def get_optional_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(HTTPBearer(auto_error=False)),
user_service: SystemUserService = Depends(get_system_user_service)
) -> Optional[SystemUserModel]:
"""
Get current user if token is provided, otherwise return None.
Useful for endpoints that work with or without authentication.
Args:
credentials: Optional HTTP Bearer token credentials
user_service: System user service instance
Returns:
Optional[SystemUserModel]: The authenticated user or None
"""
if credentials is None:
return None
try:
# Validate credentials
if not credentials.credentials:
return None
# Verify token
payload = user_service.verify_token(credentials.credentials, "access")
if payload is None:
logger.debug(
"Optional token verification failed",
extra={"authentication_type": "optional"}
)
return None
user_id: str = payload.get("sub")
if user_id is None:
return None
# Get user from database
user = await user_service.get_user_by_id(user_id)
if user is None or user.status.value != "active":
return None
return user
except Exception as e:
logger.debug(
"Optional user authentication failed",
extra={
"error": str(e),
"authentication_type": "optional"
}
)
return None