""" System User service for authentication and user management. """ import secrets from datetime import datetime, timedelta from typing import Optional, List, Dict, Any, Tuple from uuid import UUID import uuid from motor.motor_asyncio import AsyncIOMotorDatabase from passlib.context import CryptContext from jose import JWTError, jwt from fastapi import HTTPException, status import aiohttp from app.system_users.models.model import ( SystemUserModel, UserStatus, UserRole, LoginAttemptModel, SecuritySettingsModel ) from app.system_users.schemas.schema import ( CreateUserRequest, UpdateUserRequest, ChangePasswordRequest, UserInfoResponse ) from app.constants.collections import AUTH_SYSTEM_USERS_COLLECTION, AUTH_ACCESS_ROLES_COLLECTION, SCM_ACCESS_ROLES_COLLECTION from app.core.config import settings from app.core.logging import get_logger from app.utils.email_service import email_service logger = get_logger(__name__) # Password hashing context pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # JWT settings - using settings from config class SystemUserService: """Service class for system user operations.""" def __init__(self, db: AsyncIOMotorDatabase): self.db = db self.collection = db[AUTH_SYSTEM_USERS_COLLECTION] async def get_merchant_info(self, merchant_id: str) -> Optional[Dict[str, Any]]: """Get merchant information from SCM database.""" try: # Try to get from SCM database directly # Assuming both services use the same MongoDB instance but different databases scm_db_name = "cuatrolabs_scm" # SCM database name scm_db = self.collection.database.client.get_database(scm_db_name) scm_merchants_collection = scm_db["scm_merchants"] merchant = await scm_merchants_collection.find_one( {"merchant_id": merchant_id}, {"merchant_id": 1, "merchant_type": 1, "merchant_name": 1} ) if merchant: logger.info(f"Found merchant {merchant_id} with type {merchant.get('merchant_type')}") return merchant else: logger.warning(f"Merchant {merchant_id} not found in SCM database") return None except Exception as e: logger.warning(f"Error fetching merchant info from SCM database: {e}") return None @staticmethod def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against its hash.""" try: return pwd_context.verify(plain_password, hashed_password) except Exception as e: logger.error(f"Error verifying password: {e}") return False @staticmethod def get_password_hash(password: str) -> str: """Generate password hash.""" return pwd_context.hash(password) @staticmethod def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: """Create JWT access token.""" to_encode = data.copy() # Convert UUID objects to strings for JSON serialization for key, value in to_encode.items(): if isinstance(value, UUID): to_encode[key] = str(value) if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(hours=settings.TOKEN_EXPIRATION_HOURS) # Only set type to "access" if not already specified in data if "type" not in to_encode: to_encode["type"] = "access" to_encode["exp"] = expire encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) return encoded_jwt @staticmethod def create_refresh_token(data: dict) -> str: """Create JWT refresh token.""" to_encode = data.copy() # Convert UUID objects to strings for JSON serialization for key, value in to_encode.items(): if isinstance(value, UUID): to_encode[key] = str(value) expire = datetime.utcnow() + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS) to_encode.update({"exp": expire, "type": "refresh"}) encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) return encoded_jwt @staticmethod def verify_token(token: str, token_type: str = "access") -> Optional[Dict[str, Any]]: """Verify JWT token and return payload.""" try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) if payload.get("type") != token_type: return None return payload except JWTError as e: logger.warning(f"Token verification failed: {e}") return None async def get_user_by_id(self, user_id: UUID) -> Optional[SystemUserModel]: """Get user by user_id.""" try: user_doc = await self.collection.find_one({"user_id": str(user_id)}) if user_doc: return SystemUserModel(**user_doc) return None except Exception as e: logger.error(f"Error getting user by ID {user_id}: {e}") return None async def get_user_by_username(self, username: str) -> Optional[SystemUserModel]: """Get user by username.""" try: user_doc = await self.collection.find_one({"username": username.lower()}) if user_doc: return SystemUserModel(**user_doc) return None except Exception as e: logger.error(f"Error getting user by username {username}: {e}") return None async def get_user_by_email(self, email: str) -> Optional[SystemUserModel]: """Get user by email.""" try: user_doc = await self.collection.find_one({"email": email.lower()}) if user_doc: return SystemUserModel(**user_doc) return None except Exception as e: logger.error(f"Error getting user by email {email}: {e}") return None async def get_user_by_phone(self, phone: str) -> Optional[SystemUserModel]: """Get user by phone number.""" try: user_doc = await self.collection.find_one({"phone": phone}) if user_doc: return SystemUserModel(**user_doc) return None except Exception as e: logger.error(f"Error getting user by phone {phone}: {e}") return None async def create_user(self, user_data: CreateUserRequest, created_by: str) -> SystemUserModel: """Create a new user.""" try: # Check if username or email already exists existing_user = await self.get_user_by_username(user_data.username) if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Username already exists" ) existing_email = await self.get_user_by_email(user_data.email) if existing_email: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already exists" ) # Generate user ID # user_id = f"usr_{secrets.token_urlsafe(16)}" user_id=str(uuid.uuid4()) # Hash password password_hash = self.get_password_hash(user_data.password) # Get merchant_type if not provided merchant_type = user_data.merchant_type if not merchant_type and user_data.merchant_id: merchant_info = await self.get_merchant_info(user_data.merchant_id) if merchant_info: merchant_type = merchant_info.get('merchant_type') logger.info(f"Fetched merchant_type '{merchant_type}' for merchant_id '{user_data.merchant_id}'") # Create user model user_model = SystemUserModel( user_id=str(user_id), username=user_data.username.lower(), email=user_data.email.lower(), merchant_id=user_data.merchant_id, merchant_type=merchant_type, password_hash=password_hash, first_name=user_data.first_name, last_name=user_data.last_name, phone=user_data.phone, role=user_data.role, permissions=user_data.permissions, status=UserStatus.ACTIVE, # Set as active by default created_by=str(created_by), created_at=datetime.utcnow() ) doc = user_model.model_dump(by_alias=True, exclude_none=True) # safety: convert UUID -> str if any slips through for k, v in doc.items(): if isinstance(v, uuid.UUID): doc[k] = str(v) await self.collection.insert_one(doc) logger.info(f"User created successfully: {user_id}") return user_model except HTTPException: raise except Exception as e: logger.error(f"Error creating user: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to create user" ) async def authenticate_user(self, email_or_phone: str, password: str, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> Tuple[Optional[SystemUserModel], str]: """Authenticate user with email/phone/username and password.""" try: # Get user by email, phone, or username user = await self.get_user_by_email(email_or_phone) if not user: user = await self.get_user_by_phone(email_or_phone) if not user: user = await self.get_user_by_username(email_or_phone) # Record failed attempt if user not found if not user: logger.warning(f"Login attempt with non-existent email/phone/username: {email_or_phone}") return None, "Invalid email, phone number, or username" # Check if account is locked if (user.security_settings.account_locked_until and user.security_settings.account_locked_until > datetime.utcnow()): return None, f"Account is locked until {user.security_settings.account_locked_until}" # Check account status if user.status not in [UserStatus.ACTIVE]: return None, f"Account is {user.status.value}" # Verify password if not self.verify_password(password, user.password_hash): await self._record_failed_login(user, ip_address, user_agent) return None, "Invalid username or password" # Password correct - reset failed attempts and record successful login await self._record_successful_login(user, ip_address, user_agent) return user, "Authentication successful" except Exception as e: logger.error(f"Error during authentication: {e}") return None, "Authentication failed" async def _record_failed_login(self, user: SystemUserModel, ip_address: Optional[str], user_agent: Optional[str]): """Record failed login attempt and update security settings.""" try: failed_attempts = user.security_settings.failed_login_attempts + 1 now = datetime.utcnow() # Add login attempt to history login_attempt = LoginAttemptModel( timestamp=now, ip_address=ip_address, user_agent=user_agent, success=False, failure_reason="Invalid password" ) # Keep only last 10 attempts attempts_history = user.security_settings.login_attempts[-9:] + [login_attempt] update_data = { "security_settings.failed_login_attempts": failed_attempts, "security_settings.last_failed_login": now, "security_settings.login_attempts": [attempt.model_dump() for attempt in attempts_history], "updated_at": now } # Lock account if too many failed attempts if failed_attempts >= settings.MAX_FAILED_LOGIN_ATTEMPTS: lock_until = now + timedelta(minutes=settings.ACCOUNT_LOCK_DURATION_MINUTES) update_data["security_settings.account_locked_until"] = lock_until logger.warning(f"Account locked due to failed login attempts: {user.user_id}") await self.collection.update_one( {"user_id": user.user_id}, {"$set": update_data} ) except Exception as e: logger.error(f"Error recording failed login: {e}") async def _record_successful_login(self, user: SystemUserModel, ip_address: Optional[str], user_agent: Optional[str]): """Record successful login and reset security counters.""" try: now = datetime.utcnow() # Add login attempt to history login_attempt = LoginAttemptModel( timestamp=now, ip_address=ip_address, user_agent=user_agent, success=True ) # Keep only last 10 attempts attempts_history = user.security_settings.login_attempts[-9:] + [login_attempt] await self.collection.update_one( {"user_id": user.user_id}, {"$set": { "last_login_at": now, "last_login_ip": ip_address, "security_settings.failed_login_attempts": 0, "security_settings.last_failed_login": None, "security_settings.account_locked_until": None, "security_settings.login_attempts": [attempt.model_dump() for attempt in attempts_history], "updated_at": now }} ) except Exception as e: logger.error(f"Error recording successful login: {e}") async def record_logout(self, user: SystemUserModel, ip_address: Optional[str] = None, user_agent: Optional[str] = None): """ Record logout action for audit purposes. This logs the logout event to both the application logs and optionally to an audit collection. Args: user: The user who is logging out ip_address: IP address from which logout occurred user_agent: User agent string of the client """ try: now = datetime.utcnow() # Calculate session duration if last login is available session_duration = None if user.last_login_at: session_duration = (now - user.last_login_at).total_seconds() # Create audit log entry audit_log = { "event_type": "logout", "user_id": user.user_id, "username": user.username, "email": user.email, "merchant_id": user.merchant_id, "merchant_type": user.merchant_type, "role": user.role, "timestamp": now, "ip_address": ip_address, "user_agent": user_agent, "session_duration_seconds": session_duration, "last_login_at": user.last_login_at } # Log to application logs with structured data logger.info( f"User logout: {user.username}", extra={ "event": "user_logout", "user_id": user.user_id, "username": user.username, "email": user.email, "merchant_id": user.merchant_id, "merchant_type": user.merchant_type, "role": user.role, "ip_address": ip_address, "user_agent": user_agent, "session_duration": session_duration, "last_login_at": user.last_login_at.isoformat() if user.last_login_at else None } ) # Store in audit logs collection for compliance and analysis try: auth_logs_collection = self.db["scm_auth_logs"] await auth_logs_collection.insert_one(audit_log) logger.debug(f"Logout audit record stored for user {user.user_id}") except Exception as audit_error: # Don't fail logout if audit logging fails logger.error( f"Failed to store logout audit log: {audit_error}", extra={"user_id": user.user_id} ) except Exception as e: logger.error( f"Error recording logout audit: {e}", extra={"user_id": user.user_id if user else None}, exc_info=True ) def is_password_expired(self, user: SystemUserModel) -> bool: """ Check if user's password has expired based on rotation policy. Args: user: The system user model Returns: bool: True if password is expired, False otherwise """ if not user.security_settings.last_password_change: # If no password change record, consider it expired (initial setup) return True now = datetime.utcnow() days_since_change = (now - user.security_settings.last_password_change).days return days_since_change >= settings.PASSWORD_ROTATION_DAYS def get_password_age_days(self, user: SystemUserModel) -> int: """ Get the age of user's password in days. Args: user: The system user model Returns: int: Number of days since last password change """ if not user.security_settings.last_password_change: return -1 # Never changed now = datetime.utcnow() return (now - user.security_settings.last_password_change).days def get_password_rotation_status(self, user: SystemUserModel) -> Dict[str, Any]: """ Get comprehensive password rotation status for a user. Args: user: The system user model Returns: Dict: Password rotation status information """ password_age = self.get_password_age_days(user) is_expired = self.is_password_expired(user) days_until_expiry = settings.PASSWORD_ROTATION_DAYS - password_age warning_threshold = settings.PASSWORD_ROTATION_WARNING_DAYS status = "expired" if is_expired else "active" if password_age >= 0 and days_until_expiry <= warning_threshold: status = "warning" return { "password_status": status, "password_age_days": password_age, "password_rotation_days_required": settings.PASSWORD_ROTATION_DAYS, "days_until_expiry": max(0, days_until_expiry), "last_password_change": user.security_settings.last_password_change, "requires_password_change": is_expired and settings.ENFORCE_PASSWORD_ROTATION, "warning_threshold_days": warning_threshold } async def mark_password_change_required(self, user_id: UUID, reason: str = "password_rotation") -> bool: """ Mark that a user must change their password. Args: user_id: User ID reason: Reason for requiring password change Returns: bool: Success status """ try: now = datetime.utcnow() audit_log = { "event_type": "password_change_required", "user_id": user_id, "reason": reason, "timestamp": now, "enforced_at": now } # Store audit log try: auth_logs_collection = self.db["scm_auth_logs"] await auth_logs_collection.insert_one(audit_log) except Exception as audit_error: logger.warning(f"Failed to store password change audit log: {audit_error}") # Update user record await self.collection.update_one( {"user_id": user_id}, {"$set": { "security_settings.require_password_change": True, "updated_at": now }} ) logger.info( f"Password change marked as required for user {user_id}", extra={"reason": reason, "user_id": user_id} ) return True except Exception as e: logger.error(f"Error marking password change required for {user_id}: {e}") return False async def record_password_change(self, user_id: UUID, changed_by: str = "user") -> bool: """ Record password change for rotation policy compliance. Args: user_id: User ID changed_by: Who initiated the change (user, admin, system) Returns: bool: Success status """ try: now = datetime.utcnow() audit_log = { "event_type": "password_changed", "user_id": user_id, "changed_by": changed_by, "timestamp": now, "previous_rotation_status": None } # Get user's previous status user = await self.get_user_by_id(user_id) if user: audit_log["previous_rotation_status"] = self.get_password_rotation_status(user) # Store audit log try: auth_logs_collection = self.db["scm_auth_logs"] await auth_logs_collection.insert_one(audit_log) except Exception as audit_error: logger.warning(f"Failed to store password change audit log: {audit_error}") logger.info( f"Password change recorded for user {user_id}", extra={ "event": "password_changed", "user_id": user_id, "changed_by": changed_by } ) return True except Exception as e: logger.error(f"Error recording password change for {user_id}: {e}") return False """Update user information.""" try: user = await self.get_user_by_id(user_id) if not user: return None update_dict = {} for field, value in update_data.dict(exclude_unset=True).items(): if value is not None: update_dict[field] = value if update_dict: update_dict["updated_at"] = datetime.utcnow() update_dict["updated_by"] = updated_by await self.collection.update_one( {"user_id": user_id}, {"$set": update_dict} ) return await self.get_user_by_id(user_id) except Exception as e: logger.error(f"Error updating user {user_id}: {e}") return None async def change_password(self, user_id: UUID, current_password: str, new_password: str) -> bool: """Change user password.""" try: user = await self.get_user_by_id(user_id) if not user: return False # Verify current password if not self.verify_password(current_password, user.password_hash): return False # Hash new password new_password_hash = self.get_password_hash(new_password) now = datetime.utcnow() # Update password await self.collection.update_one( {"user_id": str(user_id)}, {"$set": { "password_hash": new_password_hash, "security_settings.last_password_change": datetime.utcnow(), "security_settings.require_password_change": False, "last_login_at": now, "updated_at": datetime.utcnow() }} ) # Record password change for rotation policy compliance await self.record_password_change(user_id, changed_by="user") logger.info(f"Password changed for user: {user_id}") return True except Exception as e: logger.error(f"Error changing password for user {user_id}: {e}") return False async def list_users(self, page: int = 1, page_size: int = 20, status_filter: Optional[UserStatus] = None) -> Tuple[List[SystemUserModel], int]: """List users with pagination.""" try: skip = (page - 1) * page_size # Build query filter query_filter = {} if status_filter: query_filter["status"] = status_filter.value # Get total count total_count = await self.collection.count_documents(query_filter) # Get users - don't exclude password_hash to avoid validation errors cursor = self.collection.find(query_filter).skip(skip).limit(page_size).sort("created_at", -1) users = [] async for user_doc in cursor: users.append(SystemUserModel(**user_doc)) return users, total_count except Exception as e: logger.error(f"Error listing users: {e}") return [], 0 async def list_users_with_projection( self, filters: Optional[Dict[str, Any]] = None, skip: int = 0, limit: int = 100, projection_list: Optional[List[str]] = None, status_filter: Optional[UserStatus] = None, role_filter: Optional[str] = None, merchant_id_filter: Optional[str] = None, merchant_type_filter: Optional[str] = None ): """ List users with projection support following API standards. Args: filters: Additional filter criteria skip: Number of records to skip limit: Maximum number of records to return projection_list: List of fields to include in response status_filter: Filter by user status role_filter: Filter by user role merchant_id_filter: Filter by merchant ID merchant_type_filter: Filter by merchant type Returns: List of users (raw dict if projection, SystemUserModel otherwise) """ try: # Build query filter query_filter = {} # Add specific filters if status_filter: query_filter["status"] = status_filter.value if role_filter: query_filter["role"] = role_filter if merchant_id_filter: query_filter["merchant_id"] = merchant_id_filter if merchant_type_filter: query_filter["merchant_type"] = merchant_type_filter # Add additional filters if filters: query_filter.update(filters) # Build MongoDB projection projection_dict = None if projection_list: projection_dict = {field: 1 for field in projection_list} projection_dict["_id"] = 0 # Query with projection cursor = self.collection.find(query_filter, projection_dict).skip(skip).limit(limit).sort("created_at", -1) docs = await cursor.to_list(length=limit) # Return raw dict if projection, model otherwise if projection_list: return docs else: return [SystemUserModel(**doc) for doc in docs] except Exception as e: logger.error(f"Error listing users with projection: {e}") return [] async def deactivate_user(self, user_id: str, deactivated_by: str) -> bool: """Deactivate user account.""" try: result = await self.collection.update_one( {"user_id": user_id}, {"$set": { "status": UserStatus.INACTIVE.value, "updated_at": datetime.utcnow(), "updated_by": deactivated_by }} ) if result.modified_count > 0: logger.info(f"User deactivated: {user_id}") return True return False except Exception as e: logger.error(f"Error deactivating user {user_id}: {e}") return False async def create_password_reset_token(self, email: str) -> Optional[str]: """ Create a password reset token for a user. Args: email: User's email address Returns: Reset token string or None if user not found """ try: # Get user by email user = await self.get_user_by_email(email) if not user: logger.warning(f"Password reset requested for non-existent email: {email}") # Don't reveal that user doesn't exist for security return None # Check if account is active if user.status not in [UserStatus.ACTIVE, UserStatus.PENDING_ACTIVATION]: logger.warning(f"Password reset requested for {user.status.value} account: {email}") return None # Generate secure token reset_token = secrets.token_urlsafe(32) # Create JWT token with expiration token_data = { "sub": user.user_id, "email": user.email, "type": "password_reset", "token": reset_token, "exp": datetime.utcnow() + timedelta(minutes=settings.PASSWORD_RESET_TOKEN_EXPIRATION_MINUTES) } jwt_token = jwt.encode(token_data, settings.SECRET_KEY, algorithm=settings.ALGORITHM) # Store reset token info in database await self.collection.update_one( {"user_id": user.user_id}, {"$set": { "security_settings.password_reset_token": reset_token, "security_settings.password_reset_token_created_at": datetime.utcnow(), "updated_at": datetime.utcnow() }} ) logger.info(f"Password reset token created for user: {user.user_id}") return jwt_token except Exception as e: logger.error(f"Error creating password reset token: {e}") return None async def send_password_reset_email(self, email: str) -> bool: """ Generate password reset token and send reset email. Args: email: User's email address Returns: True if email was sent successfully, False otherwise """ try: # Get user user = await self.get_user_by_email(email) if not user: # Don't reveal that user doesn't exist logger.warning(f"Password reset email requested for non-existent email: {email}") return True # Return true to not leak information # Create reset token reset_token = await self.create_password_reset_token(email) if not reset_token: logger.error(f"Failed to create reset token for: {email}") return False # Build reset link reset_link = f"{settings.PASSWORD_RESET_BASE_URL}?token={reset_token}" # Send email email_sent = await email_service.send_password_reset_email( to_email=user.email, reset_link=reset_link, user_name=user.first_name ) if email_sent: logger.info(f"Password reset email sent to: {email}") else: logger.error(f"Failed to send password reset email to: {email}") return email_sent except Exception as e: logger.error(f"Error sending password reset email: {e}") return False async def verify_password_reset_token(self, token: str) -> Optional[Dict[str, Any]]: """ Verify password reset token and return user info. Args: token: JWT reset token Returns: Dict with user_id and email if valid, None otherwise """ try: # Decode JWT token payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) # Check token type if payload.get("type") != "password_reset": logger.warning("Invalid token type for password reset") return None user_id = payload.get("sub") email = payload.get("email") reset_token = payload.get("token") if not all([user_id, email, reset_token]): logger.warning("Missing required fields in reset token") return None # Get user from database user = await self.get_user_by_id(user_id) if not user: logger.warning(f"User not found for reset token: {user_id}") return None # Verify email matches if user.email != email: logger.warning(f"Email mismatch in reset token for user: {user_id}") return None # Verify token matches stored token stored_token = user.security_settings.password_reset_token if not stored_token or stored_token != reset_token: logger.warning(f"Reset token mismatch for user: {user_id}") return None # Check if token was used (token should be cleared after use) if not user.security_settings.password_reset_token: logger.warning(f"Reset token already used for user: {user_id}") return None return { "user_id": user_id, "email": email, "token": reset_token } except JWTError as e: logger.warning(f"Invalid or expired reset token: {e}") return None except Exception as e: logger.error(f"Error verifying reset token: {e}") return None async def reset_password_with_token(self, token: str, new_password: str) -> Tuple[bool, str]: """ Reset user password using reset token. Args: token: JWT reset token new_password: New password to set Returns: Tuple of (success: bool, message: str) """ try: # Verify token token_data = await self.verify_password_reset_token(token) if not token_data: return False, "Invalid or expired reset token" user_id = token_data["user_id"] # Hash new password new_password_hash = self.get_password_hash(new_password) # Update password and clear reset token result = await self.collection.update_one( {"user_id": user_id}, {"$set": { "password_hash": new_password_hash, "security_settings.password_reset_token": None, "security_settings.password_reset_token_created_at": None, "security_settings.last_password_change": datetime.utcnow(), "security_settings.require_password_change": False, "security_settings.failed_login_attempts": 0, "security_settings.account_locked_until": None, "updated_at": datetime.utcnow() }} ) if result.modified_count > 0: logger.info(f"Password reset successfully for user: {user_id}") return True, "Password reset successfully" else: logger.error(f"Failed to update password for user: {user_id}") return False, "Failed to reset password" except Exception as e: logger.error(f"Error resetting password: {e}") return False, "An error occurred while resetting password" def convert_to_user_info_response(self, user: SystemUserModel) -> UserInfoResponse: """Convert SystemUserModel to UserInfoResponse.""" return UserInfoResponse( user_id=user.user_id, username=user.username, email=user.email, merchant_id=user.merchant_id, merchant_type=user.merchant_type, full_name=user.full_name, role=user.role, permissions=user.permissions, status=user.status, last_login_at=user.last_login_at, profile_picture_url=user.profile_picture_url ) async def get_all_roles(self) -> List[dict]: """Get all access roles from database.""" try: cursor = self.db[AUTH_ACCESS_ROLES_COLLECTION].find({}) roles = await cursor.to_list(length=None) return roles except Exception as e: logger.error(f"Error fetching access roles: {e}") return [] async def get_scm_permissions_by_role(self, role: str) -> Optional[Dict[str, Any]]: """ Fetch permissions from SCM access roles collection based on user role. Maps user role values to SCM role_ids. Args: role: User role value (e.g., "super_admin", "admin") Returns: Permissions dict or None if not found """ try: # Map user role values to SCM role_ids role_to_scm_id = { "super_admin": "role_super_admin", "admin": "role_company_admin", "manager": "role_cnf_manager", "user": "role_retail_owner", "read_only": "role_retail_staff" } # Get the SCM role_id scm_role_id = role_to_scm_id.get(role) if not scm_role_id: logger.warning(f"No SCM mapping for role: {role}, trying direct lookup") scm_role_id = role # Fetch the role from SCM access roles collection scm_role = await self.db[SCM_ACCESS_ROLES_COLLECTION].find_one( {"role_id": scm_role_id, "is_active": True} ) if scm_role and "permissions" in scm_role: logger.info(f"✅ Fetched SCM permissions for {role} -> {scm_role_id}: {scm_role.get('role_name')}") return scm_role["permissions"] else: logger.warning(f"❌ No permissions found in scm_access_roles for role_id: {scm_role_id}") return None except Exception as e: logger.error(f"Error fetching SCM permissions for role {role}: {e}") return None