Spaces:
Sleeping
Sleeping
| """ | |
| Authentication dependencies for FastAPI. | |
| """ | |
| from typing import Optional | |
| from datetime import datetime | |
| from fastapi import Depends, HTTPException, status | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from app.system_users.models.model import SystemUserModel, UserRole | |
| from app.system_users.services.service import SystemUserService | |
| from app.nosql import get_database | |
| from app.core.logging import get_logger | |
| logger = get_logger(__name__) | |
| security = HTTPBearer() | |
| def get_system_user_service() -> SystemUserService: | |
| """ | |
| Dependency to get SystemUserService instance. | |
| Returns: | |
| SystemUserService: Service instance with database connection | |
| Raises: | |
| HTTPException: 503 - Database connection not available | |
| """ | |
| try: | |
| # get_database() returns AsyncIOMotorDatabase directly, no await needed | |
| db = get_database() | |
| if db is None: | |
| logger.error( | |
| "Database connection is None", | |
| extra={"service": "system_user_service"} | |
| ) | |
| raise HTTPException( | |
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | |
| detail="Database service unavailable" | |
| ) | |
| return SystemUserService(db) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error( | |
| "Error getting system user service", | |
| extra={ | |
| "error": str(e), | |
| "error_type": type(e).__name__ | |
| }, | |
| exc_info=True | |
| ) | |
| raise HTTPException( | |
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | |
| detail="Authentication service unavailable" | |
| ) | |
| async def get_current_user( | |
| credentials: HTTPAuthorizationCredentials = Depends(security), | |
| user_service: SystemUserService = Depends(get_system_user_service) | |
| ) -> SystemUserModel: | |
| """ | |
| Get current authenticated user from JWT token. | |
| Args: | |
| credentials: HTTP Bearer token credentials | |
| user_service: System user service instance | |
| Returns: | |
| SystemUserModel: The authenticated user | |
| Raises: | |
| HTTPException: 401 - Invalid or missing credentials | |
| HTTPException: 403 - User account not active | |
| HTTPException: 500 - Database or server error | |
| """ | |
| credentials_exception = HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Could not validate credentials", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| try: | |
| # Validate credentials | |
| if not credentials or not credentials.credentials: | |
| logger.warning( | |
| "Missing authentication credentials", | |
| extra={"error_type": "missing_credentials"} | |
| ) | |
| raise credentials_exception | |
| # Verify token | |
| try: | |
| payload = user_service.verify_token(credentials.credentials, "access") | |
| except Exception as token_error: | |
| logger.warning( | |
| "Token verification failed", | |
| extra={ | |
| "error": str(token_error), | |
| "error_type": "token_verification_failed" | |
| } | |
| ) | |
| raise credentials_exception | |
| if payload is None: | |
| logger.warning( | |
| "Token verification returned None", | |
| extra={"error_type": "invalid_token"} | |
| ) | |
| raise credentials_exception | |
| user_id: str = payload.get("sub") | |
| if user_id is None: | |
| logger.warning( | |
| "Token payload missing 'sub' claim", | |
| extra={"error_type": "missing_sub_claim"} | |
| ) | |
| raise credentials_exception | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error( | |
| "Unexpected error validating credentials", | |
| extra={ | |
| "error": str(e), | |
| "error_type": type(e).__name__ | |
| }, | |
| exc_info=True | |
| ) | |
| raise credentials_exception | |
| # Get user from database | |
| try: | |
| user = await user_service.get_user_by_id(user_id) | |
| except Exception as db_error: | |
| logger.error( | |
| "Database error fetching user", | |
| extra={ | |
| "user_id": user_id, | |
| "error": str(db_error), | |
| "error_type": type(db_error).__name__ | |
| }, | |
| exc_info=True | |
| ) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to authenticate user" | |
| ) | |
| if user is None: | |
| logger.warning( | |
| "User not found", | |
| extra={ | |
| "user_id": user_id, | |
| "error_type": "user_not_found" | |
| } | |
| ) | |
| raise credentials_exception | |
| # Check if user is active | |
| if user.status.value != "active": | |
| logger.warning( | |
| "Inactive user attempted access", | |
| extra={ | |
| "user_id": user_id, | |
| "status": user.status.value, | |
| "error_type": "inactive_user" | |
| } | |
| ) | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail=f"User account is {user.status.value}" | |
| ) | |
| return user | |
| async def get_current_active_user( | |
| current_user: SystemUserModel = Depends(get_current_user) | |
| ) -> SystemUserModel: | |
| """Get current active user (alias for get_current_user for clarity).""" | |
| return current_user | |
| async def require_admin_role( | |
| current_user: SystemUserModel = Depends(get_current_user) | |
| ) -> SystemUserModel: | |
| """ | |
| Require admin or super_admin role. | |
| Args: | |
| current_user: The authenticated user | |
| Returns: | |
| SystemUserModel: The authenticated admin user | |
| Raises: | |
| HTTPException: 403 - Insufficient privileges | |
| """ | |
| admin_roles = ["admin", "super_admin", "role_super_admin", "role_company_admin"] | |
| if current_user.role_id not in admin_roles: | |
| logger.warning( | |
| "User attempted admin action without privileges", | |
| extra={ | |
| "user_id": str(current_user.user_id), | |
| "username": current_user.username, | |
| "user_role": current_user.role_id, | |
| "error_type": "insufficient_privileges" | |
| } | |
| ) | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Admin privileges required" | |
| ) | |
| return current_user | |
| async def require_super_admin_role( | |
| current_user: SystemUserModel = Depends(get_current_user) | |
| ) -> SystemUserModel: | |
| """ | |
| Require super_admin role. | |
| Args: | |
| current_user: The authenticated user | |
| Returns: | |
| SystemUserModel: The authenticated super admin user | |
| Raises: | |
| HTTPException: 403 - Insufficient privileges | |
| """ | |
| super_admin_roles = ["super_admin", "role_super_admin"] | |
| if current_user.role_id not in super_admin_roles: | |
| logger.warning( | |
| "User attempted super admin action without privileges", | |
| extra={ | |
| "user_id": str(current_user.id), | |
| "username": current_user.username, | |
| "user_role": current_user.role_id, | |
| "error_type": "insufficient_privileges" | |
| } | |
| ) | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Super admin privileges required" | |
| ) | |
| return current_user | |
| def require_permission(permission: str): | |
| """ | |
| Dependency factory to require specific permission. | |
| Args: | |
| permission: The required permission string | |
| Returns: | |
| Callable: Dependency function that checks for the permission | |
| """ | |
| async def permission_checker( | |
| current_user: SystemUserModel = Depends(get_current_user) | |
| ) -> SystemUserModel: | |
| # Super admins and admins have all permissions | |
| if current_user.role_id in ["admin", "super_admin", "role_super_admin", "role_company_admin"]: | |
| return current_user | |
| # Check if user has the specific permission | |
| if permission not in current_user.permissions: | |
| logger.warning( | |
| "User lacks required permission", | |
| extra={ | |
| "user_id": str(current_user.id), | |
| "username": current_user.username, | |
| "required_permission": permission, | |
| "user_role": current_user.role_id, | |
| "error_type": "permission_denied" | |
| } | |
| ) | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail=f"Permission '{permission}' required" | |
| ) | |
| return current_user | |
| return permission_checker | |
| async def get_optional_user( | |
| credentials: Optional[HTTPAuthorizationCredentials] = Depends(HTTPBearer(auto_error=False)), | |
| user_service: SystemUserService = Depends(get_system_user_service) | |
| ) -> Optional[SystemUserModel]: | |
| """ | |
| Get current user if token is provided, otherwise return None. | |
| Useful for endpoints that work with or without authentication. | |
| Args: | |
| credentials: Optional HTTP Bearer token credentials | |
| user_service: System user service instance | |
| Returns: | |
| Optional[SystemUserModel]: The authenticated user or None | |
| """ | |
| if credentials is None: | |
| return None | |
| try: | |
| # Validate credentials | |
| if not credentials.credentials: | |
| return None | |
| # Verify token | |
| payload = user_service.verify_token(credentials.credentials, "access") | |
| if payload is None: | |
| logger.debug( | |
| "Optional token verification failed", | |
| extra={"authentication_type": "optional"} | |
| ) | |
| return None | |
| user_id: str = payload.get("sub") | |
| if user_id is None: | |
| return None | |
| # Get user from database | |
| user = await user_service.get_user_by_id(user_id) | |
| if user is None or user.status.value != "active": | |
| return None | |
| return user | |
| except Exception as e: | |
| logger.debug( | |
| "Optional user authentication failed", | |
| extra={ | |
| "error": str(e), | |
| "authentication_type": "optional" | |
| } | |
| ) | |
| return None | |