Spaces:
Running
Running
| """ | |
| System User service for authentication and user management. | |
| """ | |
| import secrets | |
| from datetime import datetime, timedelta | |
| from typing import Optional, List, Dict, Any, Tuple | |
| from uuid import UUID | |
| import uuid | |
| from motor.motor_asyncio import AsyncIOMotorDatabase | |
| from passlib.context import CryptContext | |
| from jose import JWTError, jwt | |
| from fastapi import HTTPException, status | |
| import aiohttp | |
| from app.system_users.models.model import ( | |
| SystemUserModel, | |
| UserStatus, | |
| UserRole, | |
| LoginAttemptModel, | |
| SecuritySettingsModel | |
| ) | |
| from app.system_users.schemas.schema import ( | |
| CreateUserRequest, | |
| UpdateUserRequest, | |
| ChangePasswordRequest, | |
| UserInfoResponse | |
| ) | |
| from app.constants.collections import AUTH_SYSTEM_USERS_COLLECTION, AUTH_ACCESS_ROLES_COLLECTION, SCM_ACCESS_ROLES_COLLECTION | |
| from app.core.config import settings | |
| from app.core.logging import get_logger | |
| from app.utils.email_service import email_service | |
| logger = get_logger(__name__) | |
| # Password hashing context | |
| pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
| # JWT settings - using settings from config | |
| class SystemUserService: | |
| """Service class for system user operations.""" | |
| def __init__(self, db: AsyncIOMotorDatabase): | |
| self.db = db | |
| self.collection = db[AUTH_SYSTEM_USERS_COLLECTION] | |
| async def get_merchant_info(self, merchant_id: str) -> Optional[Dict[str, Any]]: | |
| """Get merchant information from SCM database.""" | |
| try: | |
| # Try to get from SCM database directly | |
| # Assuming both services use the same MongoDB instance but different databases | |
| scm_db_name = "cuatrolabs_scm" # SCM database name | |
| scm_db = self.collection.database.client.get_database(scm_db_name) | |
| scm_merchants_collection = scm_db["scm_merchants"] | |
| merchant = await scm_merchants_collection.find_one( | |
| {"merchant_id": merchant_id}, | |
| {"merchant_id": 1, "merchant_type": 1, "merchant_name": 1} | |
| ) | |
| if merchant: | |
| logger.info(f"Found merchant {merchant_id} with type {merchant.get('merchant_type')}") | |
| return merchant | |
| else: | |
| logger.warning(f"Merchant {merchant_id} not found in SCM database") | |
| return None | |
| except Exception as e: | |
| logger.warning(f"Error fetching merchant info from SCM database: {e}") | |
| return None | |
| def verify_password(plain_password: str, hashed_password: str) -> bool: | |
| """Verify a password against its hash.""" | |
| try: | |
| return pwd_context.verify(plain_password, hashed_password) | |
| except Exception as e: | |
| logger.error(f"Error verifying password: {e}") | |
| return False | |
| def get_password_hash(password: str) -> str: | |
| """Generate password hash.""" | |
| return pwd_context.hash(password) | |
| def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: | |
| """Create JWT access token.""" | |
| to_encode = data.copy() | |
| # Convert UUID objects to strings for JSON serialization | |
| for key, value in to_encode.items(): | |
| if isinstance(value, UUID): | |
| to_encode[key] = str(value) | |
| if expires_delta: | |
| expire = datetime.utcnow() + expires_delta | |
| else: | |
| expire = datetime.utcnow() + timedelta(hours=settings.TOKEN_EXPIRATION_HOURS) | |
| # Only set type to "access" if not already specified in data | |
| if "type" not in to_encode: | |
| to_encode["type"] = "access" | |
| to_encode["exp"] = expire | |
| encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) | |
| return encoded_jwt | |
| def create_refresh_token(data: dict) -> str: | |
| """Create JWT refresh token.""" | |
| to_encode = data.copy() | |
| # Convert UUID objects to strings for JSON serialization | |
| for key, value in to_encode.items(): | |
| if isinstance(value, UUID): | |
| to_encode[key] = str(value) | |
| expire = datetime.utcnow() + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS) | |
| to_encode.update({"exp": expire, "type": "refresh"}) | |
| encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) | |
| return encoded_jwt | |
| def verify_token(token: str, token_type: str = "access") -> Optional[Dict[str, Any]]: | |
| """Verify JWT token and return payload.""" | |
| try: | |
| payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) | |
| if payload.get("type") != token_type: | |
| return None | |
| return payload | |
| except JWTError as e: | |
| logger.warning(f"Token verification failed: {e}") | |
| return None | |
| async def get_user_by_id(self, user_id: UUID) -> Optional[SystemUserModel]: | |
| """Get user by user_id.""" | |
| try: | |
| user_doc = await self.collection.find_one({"user_id": str(user_id)}) | |
| if user_doc: | |
| return SystemUserModel(**user_doc) | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error getting user by ID {user_id}: {e}") | |
| return None | |
| async def get_user_by_username(self, username: str) -> Optional[SystemUserModel]: | |
| """Get user by username.""" | |
| try: | |
| user_doc = await self.collection.find_one({"username": username.lower()}) | |
| if user_doc: | |
| return SystemUserModel(**user_doc) | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error getting user by username {username}: {e}") | |
| return None | |
| async def get_user_by_email(self, email: str) -> Optional[SystemUserModel]: | |
| """Get user by email.""" | |
| try: | |
| user_doc = await self.collection.find_one({"email": email.lower()}) | |
| if user_doc: | |
| return SystemUserModel(**user_doc) | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error getting user by email {email}: {e}") | |
| return None | |
| async def get_user_by_phone(self, phone: str) -> Optional[SystemUserModel]: | |
| """Get user by phone number.""" | |
| try: | |
| user_doc = await self.collection.find_one({"phone": phone}) | |
| if user_doc: | |
| return SystemUserModel(**user_doc) | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error getting user by phone {phone}: {e}") | |
| return None | |
| async def create_user(self, user_data: CreateUserRequest, created_by: str) -> SystemUserModel: | |
| """Create a new user.""" | |
| try: | |
| # Check if username or email already exists | |
| existing_user = await self.get_user_by_username(user_data.username) | |
| if existing_user: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Username already exists" | |
| ) | |
| existing_email = await self.get_user_by_email(user_data.email) | |
| if existing_email: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Email already exists" | |
| ) | |
| # Generate user ID | |
| # user_id = f"usr_{secrets.token_urlsafe(16)}" | |
| user_id=str(uuid.uuid4()) | |
| # Hash password | |
| password_hash = self.get_password_hash(user_data.password) | |
| # Get merchant_type if not provided | |
| merchant_type = user_data.merchant_type | |
| if not merchant_type and user_data.merchant_id: | |
| merchant_info = await self.get_merchant_info(user_data.merchant_id) | |
| if merchant_info: | |
| merchant_type = merchant_info.get('merchant_type') | |
| logger.info(f"Fetched merchant_type '{merchant_type}' for merchant_id '{user_data.merchant_id}'") | |
| # Create user model | |
| user_model = SystemUserModel( | |
| user_id=str(user_id), | |
| username=user_data.username.lower(), | |
| email=user_data.email.lower(), | |
| merchant_id=user_data.merchant_id, | |
| merchant_type=merchant_type, | |
| password_hash=password_hash, | |
| first_name=user_data.first_name, | |
| last_name=user_data.last_name, | |
| phone=user_data.phone, | |
| role=user_data.role, | |
| permissions=user_data.permissions, | |
| status=UserStatus.ACTIVE, # Set as active by default | |
| created_by=str(created_by), | |
| created_at=datetime.utcnow() | |
| ) | |
| doc = user_model.model_dump(by_alias=True, exclude_none=True) | |
| # safety: convert UUID -> str if any slips through | |
| for k, v in doc.items(): | |
| if isinstance(v, uuid.UUID): | |
| doc[k] = str(v) | |
| await self.collection.insert_one(doc) | |
| logger.info(f"User created successfully: {user_id}") | |
| return user_model | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error creating user: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to create user" | |
| ) | |
| async def authenticate_user(self, email_or_phone: str, password: str, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> Tuple[Optional[SystemUserModel], str]: | |
| """Authenticate user with email/phone/username and password.""" | |
| try: | |
| # Get user by email, phone, or username | |
| user = await self.get_user_by_email(email_or_phone) | |
| if not user: | |
| user = await self.get_user_by_phone(email_or_phone) | |
| if not user: | |
| user = await self.get_user_by_username(email_or_phone) | |
| # Record failed attempt if user not found | |
| if not user: | |
| logger.warning(f"Login attempt with non-existent email/phone/username: {email_or_phone}") | |
| return None, "Invalid email, phone number, or username" | |
| # Check if account is locked | |
| if (user.security_settings.account_locked_until and | |
| user.security_settings.account_locked_until > datetime.utcnow()): | |
| return None, f"Account is locked until {user.security_settings.account_locked_until}" | |
| # Check account status | |
| if user.status not in [UserStatus.ACTIVE]: | |
| return None, f"Account is {user.status.value}" | |
| # Verify password | |
| if not self.verify_password(password, user.password_hash): | |
| await self._record_failed_login(user, ip_address, user_agent) | |
| return None, "Invalid username or password" | |
| # Password correct - reset failed attempts and record successful login | |
| await self._record_successful_login(user, ip_address, user_agent) | |
| return user, "Authentication successful" | |
| except Exception as e: | |
| logger.error(f"Error during authentication: {e}") | |
| return None, "Authentication failed" | |
| async def _record_failed_login(self, user: SystemUserModel, ip_address: Optional[str], user_agent: Optional[str]): | |
| """Record failed login attempt and update security settings.""" | |
| try: | |
| failed_attempts = user.security_settings.failed_login_attempts + 1 | |
| now = datetime.utcnow() | |
| # Add login attempt to history | |
| login_attempt = LoginAttemptModel( | |
| timestamp=now, | |
| ip_address=ip_address, | |
| user_agent=user_agent, | |
| success=False, | |
| failure_reason="Invalid password" | |
| ) | |
| # Keep only last 10 attempts | |
| attempts_history = user.security_settings.login_attempts[-9:] + [login_attempt] | |
| update_data = { | |
| "security_settings.failed_login_attempts": failed_attempts, | |
| "security_settings.last_failed_login": now, | |
| "security_settings.login_attempts": [attempt.model_dump() for attempt in attempts_history], | |
| "updated_at": now | |
| } | |
| # Lock account if too many failed attempts | |
| if failed_attempts >= settings.MAX_FAILED_LOGIN_ATTEMPTS: | |
| lock_until = now + timedelta(minutes=settings.ACCOUNT_LOCK_DURATION_MINUTES) | |
| update_data["security_settings.account_locked_until"] = lock_until | |
| logger.warning(f"Account locked due to failed login attempts: {user.user_id}") | |
| await self.collection.update_one( | |
| {"user_id": user.user_id}, | |
| {"$set": update_data} | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error recording failed login: {e}") | |
| async def _record_successful_login(self, user: SystemUserModel, ip_address: Optional[str], user_agent: Optional[str]): | |
| """Record successful login and reset security counters.""" | |
| try: | |
| now = datetime.utcnow() | |
| # Add login attempt to history | |
| login_attempt = LoginAttemptModel( | |
| timestamp=now, | |
| ip_address=ip_address, | |
| user_agent=user_agent, | |
| success=True | |
| ) | |
| # Keep only last 10 attempts | |
| attempts_history = user.security_settings.login_attempts[-9:] + [login_attempt] | |
| await self.collection.update_one( | |
| {"user_id": user.user_id}, | |
| {"$set": { | |
| "last_login_at": now, | |
| "last_login_ip": ip_address, | |
| "security_settings.failed_login_attempts": 0, | |
| "security_settings.last_failed_login": None, | |
| "security_settings.account_locked_until": None, | |
| "security_settings.login_attempts": [attempt.model_dump() for attempt in attempts_history], | |
| "updated_at": now | |
| }} | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error recording successful login: {e}") | |
| async def record_logout(self, user: SystemUserModel, ip_address: Optional[str] = None, user_agent: Optional[str] = None): | |
| """ | |
| Record logout action for audit purposes. | |
| This logs the logout event to both the application logs and optionally to an audit collection. | |
| Args: | |
| user: The user who is logging out | |
| ip_address: IP address from which logout occurred | |
| user_agent: User agent string of the client | |
| """ | |
| try: | |
| now = datetime.utcnow() | |
| # Calculate session duration if last login is available | |
| session_duration = None | |
| if user.last_login_at: | |
| session_duration = (now - user.last_login_at).total_seconds() | |
| # Create audit log entry | |
| audit_log = { | |
| "event_type": "logout", | |
| "user_id": user.user_id, | |
| "username": user.username, | |
| "email": user.email, | |
| "merchant_id": user.merchant_id, | |
| "merchant_type": user.merchant_type, | |
| "role": user.role, | |
| "timestamp": now, | |
| "ip_address": ip_address, | |
| "user_agent": user_agent, | |
| "session_duration_seconds": session_duration, | |
| "last_login_at": user.last_login_at | |
| } | |
| # Log to application logs with structured data | |
| logger.info( | |
| f"User logout: {user.username}", | |
| extra={ | |
| "event": "user_logout", | |
| "user_id": user.user_id, | |
| "username": user.username, | |
| "email": user.email, | |
| "merchant_id": user.merchant_id, | |
| "merchant_type": user.merchant_type, | |
| "role": user.role, | |
| "ip_address": ip_address, | |
| "user_agent": user_agent, | |
| "session_duration": session_duration, | |
| "last_login_at": user.last_login_at.isoformat() if user.last_login_at else None | |
| } | |
| ) | |
| # Store in audit logs collection for compliance and analysis | |
| try: | |
| auth_logs_collection = self.db["scm_auth_logs"] | |
| await auth_logs_collection.insert_one(audit_log) | |
| logger.debug(f"Logout audit record stored for user {user.user_id}") | |
| except Exception as audit_error: | |
| # Don't fail logout if audit logging fails | |
| logger.error( | |
| f"Failed to store logout audit log: {audit_error}", | |
| extra={"user_id": user.user_id} | |
| ) | |
| except Exception as e: | |
| logger.error( | |
| f"Error recording logout audit: {e}", | |
| extra={"user_id": user.user_id if user else None}, | |
| exc_info=True | |
| ) | |
| def is_password_expired(self, user: SystemUserModel) -> bool: | |
| """ | |
| Check if user's password has expired based on rotation policy. | |
| Args: | |
| user: The system user model | |
| Returns: | |
| bool: True if password is expired, False otherwise | |
| """ | |
| if not user.security_settings.last_password_change: | |
| # If no password change record, consider it expired (initial setup) | |
| return True | |
| now = datetime.utcnow() | |
| days_since_change = (now - user.security_settings.last_password_change).days | |
| return days_since_change >= settings.PASSWORD_ROTATION_DAYS | |
| def get_password_age_days(self, user: SystemUserModel) -> int: | |
| """ | |
| Get the age of user's password in days. | |
| Args: | |
| user: The system user model | |
| Returns: | |
| int: Number of days since last password change | |
| """ | |
| if not user.security_settings.last_password_change: | |
| return -1 # Never changed | |
| now = datetime.utcnow() | |
| return (now - user.security_settings.last_password_change).days | |
| def get_password_rotation_status(self, user: SystemUserModel) -> Dict[str, Any]: | |
| """ | |
| Get comprehensive password rotation status for a user. | |
| Args: | |
| user: The system user model | |
| Returns: | |
| Dict: Password rotation status information | |
| """ | |
| password_age = self.get_password_age_days(user) | |
| is_expired = self.is_password_expired(user) | |
| days_until_expiry = settings.PASSWORD_ROTATION_DAYS - password_age | |
| warning_threshold = settings.PASSWORD_ROTATION_WARNING_DAYS | |
| status = "expired" if is_expired else "active" | |
| if password_age >= 0 and days_until_expiry <= warning_threshold: | |
| status = "warning" | |
| return { | |
| "password_status": status, | |
| "password_age_days": password_age, | |
| "password_rotation_days_required": settings.PASSWORD_ROTATION_DAYS, | |
| "days_until_expiry": max(0, days_until_expiry), | |
| "last_password_change": user.security_settings.last_password_change, | |
| "requires_password_change": is_expired and settings.ENFORCE_PASSWORD_ROTATION, | |
| "warning_threshold_days": warning_threshold | |
| } | |
| async def mark_password_change_required(self, user_id: UUID, reason: str = "password_rotation") -> bool: | |
| """ | |
| Mark that a user must change their password. | |
| Args: | |
| user_id: User ID | |
| reason: Reason for requiring password change | |
| Returns: | |
| bool: Success status | |
| """ | |
| try: | |
| now = datetime.utcnow() | |
| audit_log = { | |
| "event_type": "password_change_required", | |
| "user_id": user_id, | |
| "reason": reason, | |
| "timestamp": now, | |
| "enforced_at": now | |
| } | |
| # Store audit log | |
| try: | |
| auth_logs_collection = self.db["scm_auth_logs"] | |
| await auth_logs_collection.insert_one(audit_log) | |
| except Exception as audit_error: | |
| logger.warning(f"Failed to store password change audit log: {audit_error}") | |
| # Update user record | |
| await self.collection.update_one( | |
| {"user_id": user_id}, | |
| {"$set": { | |
| "security_settings.require_password_change": True, | |
| "updated_at": now | |
| }} | |
| ) | |
| logger.info( | |
| f"Password change marked as required for user {user_id}", | |
| extra={"reason": reason, "user_id": user_id} | |
| ) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error marking password change required for {user_id}: {e}") | |
| return False | |
| async def record_password_change(self, user_id: UUID, changed_by: str = "user") -> bool: | |
| """ | |
| Record password change for rotation policy compliance. | |
| Args: | |
| user_id: User ID | |
| changed_by: Who initiated the change (user, admin, system) | |
| Returns: | |
| bool: Success status | |
| """ | |
| try: | |
| now = datetime.utcnow() | |
| audit_log = { | |
| "event_type": "password_changed", | |
| "user_id": user_id, | |
| "changed_by": changed_by, | |
| "timestamp": now, | |
| "previous_rotation_status": None | |
| } | |
| # Get user's previous status | |
| user = await self.get_user_by_id(user_id) | |
| if user: | |
| audit_log["previous_rotation_status"] = self.get_password_rotation_status(user) | |
| # Store audit log | |
| try: | |
| auth_logs_collection = self.db["scm_auth_logs"] | |
| await auth_logs_collection.insert_one(audit_log) | |
| except Exception as audit_error: | |
| logger.warning(f"Failed to store password change audit log: {audit_error}") | |
| logger.info( | |
| f"Password change recorded for user {user_id}", | |
| extra={ | |
| "event": "password_changed", | |
| "user_id": user_id, | |
| "changed_by": changed_by | |
| } | |
| ) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error recording password change for {user_id}: {e}") | |
| return False | |
| """Update user information.""" | |
| try: | |
| user = await self.get_user_by_id(user_id) | |
| if not user: | |
| return None | |
| update_dict = {} | |
| for field, value in update_data.dict(exclude_unset=True).items(): | |
| if value is not None: | |
| update_dict[field] = value | |
| if update_dict: | |
| update_dict["updated_at"] = datetime.utcnow() | |
| update_dict["updated_by"] = updated_by | |
| await self.collection.update_one( | |
| {"user_id": user_id}, | |
| {"$set": update_dict} | |
| ) | |
| return await self.get_user_by_id(user_id) | |
| except Exception as e: | |
| logger.error(f"Error updating user {user_id}: {e}") | |
| return None | |
| async def change_password(self, user_id: UUID, current_password: str, new_password: str) -> bool: | |
| """Change user password.""" | |
| try: | |
| user = await self.get_user_by_id(user_id) | |
| if not user: | |
| return False | |
| # Verify current password | |
| if not self.verify_password(current_password, user.password_hash): | |
| return False | |
| # Hash new password | |
| new_password_hash = self.get_password_hash(new_password) | |
| now = datetime.utcnow() | |
| # Update password | |
| await self.collection.update_one( | |
| {"user_id": str(user_id)}, | |
| {"$set": { | |
| "password_hash": new_password_hash, | |
| "security_settings.last_password_change": datetime.utcnow(), | |
| "security_settings.require_password_change": False, | |
| "last_login_at": now, | |
| "updated_at": datetime.utcnow() | |
| }} | |
| ) | |
| # Record password change for rotation policy compliance | |
| await self.record_password_change(user_id, changed_by="user") | |
| logger.info(f"Password changed for user: {user_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error changing password for user {user_id}: {e}") | |
| return False | |
| async def list_users(self, page: int = 1, page_size: int = 20, status_filter: Optional[UserStatus] = None) -> Tuple[List[SystemUserModel], int]: | |
| """List users with pagination.""" | |
| try: | |
| skip = (page - 1) * page_size | |
| # Build query filter | |
| query_filter = {} | |
| if status_filter: | |
| query_filter["status"] = status_filter.value | |
| # Get total count | |
| total_count = await self.collection.count_documents(query_filter) | |
| # Get users - don't exclude password_hash to avoid validation errors | |
| cursor = self.collection.find(query_filter).skip(skip).limit(page_size).sort("created_at", -1) | |
| users = [] | |
| async for user_doc in cursor: | |
| users.append(SystemUserModel(**user_doc)) | |
| return users, total_count | |
| except Exception as e: | |
| logger.error(f"Error listing users: {e}") | |
| return [], 0 | |
| async def list_users_with_projection( | |
| self, | |
| filters: Optional[Dict[str, Any]] = None, | |
| skip: int = 0, | |
| limit: int = 100, | |
| projection_list: Optional[List[str]] = None, | |
| status_filter: Optional[UserStatus] = None, | |
| role_filter: Optional[str] = None, | |
| merchant_id_filter: Optional[str] = None, | |
| merchant_type_filter: Optional[str] = None | |
| ): | |
| """ | |
| List users with projection support following API standards. | |
| Args: | |
| filters: Additional filter criteria | |
| skip: Number of records to skip | |
| limit: Maximum number of records to return | |
| projection_list: List of fields to include in response | |
| status_filter: Filter by user status | |
| role_filter: Filter by user role | |
| merchant_id_filter: Filter by merchant ID | |
| merchant_type_filter: Filter by merchant type | |
| Returns: | |
| List of users (raw dict if projection, SystemUserModel otherwise) | |
| """ | |
| try: | |
| # Build query filter | |
| query_filter = {} | |
| # Add specific filters | |
| if status_filter: | |
| query_filter["status"] = status_filter.value | |
| if role_filter: | |
| query_filter["role"] = role_filter | |
| if merchant_id_filter: | |
| query_filter["merchant_id"] = merchant_id_filter | |
| if merchant_type_filter: | |
| query_filter["merchant_type"] = merchant_type_filter | |
| # Add additional filters | |
| if filters: | |
| query_filter.update(filters) | |
| # Build MongoDB projection | |
| projection_dict = None | |
| if projection_list: | |
| projection_dict = {field: 1 for field in projection_list} | |
| projection_dict["_id"] = 0 | |
| # Query with projection | |
| cursor = self.collection.find(query_filter, projection_dict).skip(skip).limit(limit).sort("created_at", -1) | |
| docs = await cursor.to_list(length=limit) | |
| # Return raw dict if projection, model otherwise | |
| if projection_list: | |
| return docs | |
| else: | |
| return [SystemUserModel(**doc) for doc in docs] | |
| except Exception as e: | |
| logger.error(f"Error listing users with projection: {e}") | |
| return [] | |
| async def deactivate_user(self, user_id: str, deactivated_by: str) -> bool: | |
| """Deactivate user account.""" | |
| try: | |
| result = await self.collection.update_one( | |
| {"user_id": user_id}, | |
| {"$set": { | |
| "status": UserStatus.INACTIVE.value, | |
| "updated_at": datetime.utcnow(), | |
| "updated_by": deactivated_by | |
| }} | |
| ) | |
| if result.modified_count > 0: | |
| logger.info(f"User deactivated: {user_id}") | |
| return True | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error deactivating user {user_id}: {e}") | |
| return False | |
| async def create_password_reset_token(self, email: str) -> Optional[str]: | |
| """ | |
| Create a password reset token for a user. | |
| Args: | |
| email: User's email address | |
| Returns: | |
| Reset token string or None if user not found | |
| """ | |
| try: | |
| # Get user by email | |
| user = await self.get_user_by_email(email) | |
| if not user: | |
| logger.warning(f"Password reset requested for non-existent email: {email}") | |
| # Don't reveal that user doesn't exist for security | |
| return None | |
| # Check if account is active | |
| if user.status not in [UserStatus.ACTIVE, UserStatus.PENDING_ACTIVATION]: | |
| logger.warning(f"Password reset requested for {user.status.value} account: {email}") | |
| return None | |
| # Generate secure token | |
| reset_token = secrets.token_urlsafe(32) | |
| # Create JWT token with expiration | |
| token_data = { | |
| "sub": user.user_id, | |
| "email": user.email, | |
| "type": "password_reset", | |
| "token": reset_token, | |
| "exp": datetime.utcnow() + timedelta(minutes=settings.PASSWORD_RESET_TOKEN_EXPIRATION_MINUTES) | |
| } | |
| jwt_token = jwt.encode(token_data, settings.SECRET_KEY, algorithm=settings.ALGORITHM) | |
| # Store reset token info in database | |
| await self.collection.update_one( | |
| {"user_id": user.user_id}, | |
| {"$set": { | |
| "security_settings.password_reset_token": reset_token, | |
| "security_settings.password_reset_token_created_at": datetime.utcnow(), | |
| "updated_at": datetime.utcnow() | |
| }} | |
| ) | |
| logger.info(f"Password reset token created for user: {user.user_id}") | |
| return jwt_token | |
| except Exception as e: | |
| logger.error(f"Error creating password reset token: {e}") | |
| return None | |
| async def send_password_reset_email(self, email: str) -> bool: | |
| """ | |
| Generate password reset token and send reset email. | |
| Args: | |
| email: User's email address | |
| Returns: | |
| True if email was sent successfully, False otherwise | |
| """ | |
| try: | |
| # Get user | |
| user = await self.get_user_by_email(email) | |
| if not user: | |
| # Don't reveal that user doesn't exist | |
| logger.warning(f"Password reset email requested for non-existent email: {email}") | |
| return True # Return true to not leak information | |
| # Create reset token | |
| reset_token = await self.create_password_reset_token(email) | |
| if not reset_token: | |
| logger.error(f"Failed to create reset token for: {email}") | |
| return False | |
| # Build reset link | |
| reset_link = f"{settings.PASSWORD_RESET_BASE_URL}?token={reset_token}" | |
| # Send email | |
| email_sent = await email_service.send_password_reset_email( | |
| to_email=user.email, | |
| reset_link=reset_link, | |
| user_name=user.first_name | |
| ) | |
| if email_sent: | |
| logger.info(f"Password reset email sent to: {email}") | |
| else: | |
| logger.error(f"Failed to send password reset email to: {email}") | |
| return email_sent | |
| except Exception as e: | |
| logger.error(f"Error sending password reset email: {e}") | |
| return False | |
| async def verify_password_reset_token(self, token: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Verify password reset token and return user info. | |
| Args: | |
| token: JWT reset token | |
| Returns: | |
| Dict with user_id and email if valid, None otherwise | |
| """ | |
| try: | |
| # Decode JWT token | |
| payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) | |
| # Check token type | |
| if payload.get("type") != "password_reset": | |
| logger.warning("Invalid token type for password reset") | |
| return None | |
| user_id = payload.get("sub") | |
| email = payload.get("email") | |
| reset_token = payload.get("token") | |
| if not all([user_id, email, reset_token]): | |
| logger.warning("Missing required fields in reset token") | |
| return None | |
| # Get user from database | |
| user = await self.get_user_by_id(user_id) | |
| if not user: | |
| logger.warning(f"User not found for reset token: {user_id}") | |
| return None | |
| # Verify email matches | |
| if user.email != email: | |
| logger.warning(f"Email mismatch in reset token for user: {user_id}") | |
| return None | |
| # Verify token matches stored token | |
| stored_token = user.security_settings.password_reset_token | |
| if not stored_token or stored_token != reset_token: | |
| logger.warning(f"Reset token mismatch for user: {user_id}") | |
| return None | |
| # Check if token was used (token should be cleared after use) | |
| if not user.security_settings.password_reset_token: | |
| logger.warning(f"Reset token already used for user: {user_id}") | |
| return None | |
| return { | |
| "user_id": user_id, | |
| "email": email, | |
| "token": reset_token | |
| } | |
| except JWTError as e: | |
| logger.warning(f"Invalid or expired reset token: {e}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error verifying reset token: {e}") | |
| return None | |
| async def reset_password_with_token(self, token: str, new_password: str) -> Tuple[bool, str]: | |
| """ | |
| Reset user password using reset token. | |
| Args: | |
| token: JWT reset token | |
| new_password: New password to set | |
| Returns: | |
| Tuple of (success: bool, message: str) | |
| """ | |
| try: | |
| # Verify token | |
| token_data = await self.verify_password_reset_token(token) | |
| if not token_data: | |
| return False, "Invalid or expired reset token" | |
| user_id = token_data["user_id"] | |
| # Hash new password | |
| new_password_hash = self.get_password_hash(new_password) | |
| # Update password and clear reset token | |
| result = await self.collection.update_one( | |
| {"user_id": user_id}, | |
| {"$set": { | |
| "password_hash": new_password_hash, | |
| "security_settings.password_reset_token": None, | |
| "security_settings.password_reset_token_created_at": None, | |
| "security_settings.last_password_change": datetime.utcnow(), | |
| "security_settings.require_password_change": False, | |
| "security_settings.failed_login_attempts": 0, | |
| "security_settings.account_locked_until": None, | |
| "updated_at": datetime.utcnow() | |
| }} | |
| ) | |
| if result.modified_count > 0: | |
| logger.info(f"Password reset successfully for user: {user_id}") | |
| return True, "Password reset successfully" | |
| else: | |
| logger.error(f"Failed to update password for user: {user_id}") | |
| return False, "Failed to reset password" | |
| except Exception as e: | |
| logger.error(f"Error resetting password: {e}") | |
| return False, "An error occurred while resetting password" | |
| def convert_to_user_info_response(self, user: SystemUserModel) -> UserInfoResponse: | |
| """Convert SystemUserModel to UserInfoResponse.""" | |
| return UserInfoResponse( | |
| user_id=user.user_id, | |
| username=user.username, | |
| email=user.email, | |
| merchant_id=user.merchant_id, | |
| merchant_type=user.merchant_type, | |
| full_name=user.full_name, | |
| role=user.role, | |
| permissions=user.permissions, | |
| status=user.status, | |
| last_login_at=user.last_login_at, | |
| profile_picture_url=user.profile_picture_url | |
| ) | |
| async def get_all_roles(self) -> List[dict]: | |
| """Get all access roles from database.""" | |
| try: | |
| cursor = self.db[AUTH_ACCESS_ROLES_COLLECTION].find({}) | |
| roles = await cursor.to_list(length=None) | |
| return roles | |
| except Exception as e: | |
| logger.error(f"Error fetching access roles: {e}") | |
| return [] | |
| async def get_scm_permissions_by_role(self, role: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Fetch permissions from SCM access roles collection based on user role. | |
| Maps user role values to SCM role_ids. | |
| Args: | |
| role: User role value (e.g., "super_admin", "admin") | |
| Returns: | |
| Permissions dict or None if not found | |
| """ | |
| try: | |
| # Map user role values to SCM role_ids | |
| role_to_scm_id = { | |
| "super_admin": "role_super_admin", | |
| "admin": "role_company_admin", | |
| "manager": "role_cnf_manager", | |
| "user": "role_retail_owner", | |
| "read_only": "role_retail_staff" | |
| } | |
| # Get the SCM role_id | |
| scm_role_id = role_to_scm_id.get(role) | |
| if not scm_role_id: | |
| logger.warning(f"No SCM mapping for role: {role}, trying direct lookup") | |
| scm_role_id = role | |
| # Fetch the role from SCM access roles collection | |
| scm_role = await self.db[SCM_ACCESS_ROLES_COLLECTION].find_one( | |
| {"role_id": scm_role_id, "is_active": True} | |
| ) | |
| if scm_role and "permissions" in scm_role: | |
| logger.info(f"✅ Fetched SCM permissions for {role} -> {scm_role_id}: {scm_role.get('role_name')}") | |
| return scm_role["permissions"] | |
| else: | |
| logger.warning(f"❌ No permissions found in scm_access_roles for role_id: {scm_role_id}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error fetching SCM permissions for role {role}: {e}") | |
| return None |