Spaces:
Runtime error
Runtime error
| """ | |
| System User service for authentication and user management. | |
| """ | |
| import secrets | |
| from datetime import datetime, timedelta | |
| from typing import Optional, List, Tuple, Dict, Any | |
| import uuid | |
| from motor.motor_asyncio import AsyncIOMotorDatabase | |
| from passlib.context import CryptContext | |
| from jose import JWTError, jwt | |
| from fastapi import HTTPException, status | |
| from app.core.logging import get_logger | |
| from app.system_users.models.model import SystemUserModel, UserStatus | |
| from app.system_users.schemas.schema import ( | |
| CreateUserRequest, | |
| UpdateUserRequest, | |
| ChangePasswordRequest, | |
| UserInfoResponse, | |
| SecuritySettings, | |
| ) | |
| from app.system_users.constants import SCM_SYSTEM_USERS_COLLECTION | |
| from app.access_roles.constants import SCM_ACCESS_ROLES_COLLECTION | |
| from app.merchants.constants import SCM_MERCHANTS_COLLECTION | |
| from app.employees.constants import SCM_EMPLOYEES_COLLECTION | |
| from app.core.config import settings | |
| logger = get_logger(__name__) | |
| # Password hashing context | |
| pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
| # JWT settings | |
| ALGORITHM = "HS256" | |
| ACCESS_TOKEN_EXPIRE_MINUTES = 30 | |
| REFRESH_TOKEN_EXPIRE_DAYS = 7 | |
| class SystemUserService: | |
| """Service class for system user operations.""" | |
| def __init__(self, db: AsyncIOMotorDatabase): | |
| self.db = db | |
| self.collection = db[SCM_SYSTEM_USERS_COLLECTION] | |
| def verify_password(plain_password: str, hashed_password: str) -> bool: | |
| """Verify a password against its hash.""" | |
| try: | |
| return pwd_context.verify(plain_password, hashed_password) | |
| except Exception as e: | |
| logger.error( | |
| "Error verifying password", | |
| extra={"event": "system_user_password_verify_failed", "error": str(e)}, | |
| exc_info=True, | |
| ) | |
| return False | |
| def get_password_hash(password: str) -> str: | |
| """Generate password hash.""" | |
| return pwd_context.hash(password) | |
| def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: | |
| """Create JWT access token.""" | |
| to_encode = data.copy() | |
| if expires_delta: | |
| expire = datetime.utcnow() + expires_delta | |
| else: | |
| expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) | |
| to_encode.update({"exp": expire, "type": "access"}) | |
| encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=ALGORITHM) | |
| return encoded_jwt | |
| def create_refresh_token(data: dict) -> str: | |
| """Create JWT refresh token.""" | |
| to_encode = data.copy() | |
| expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) | |
| to_encode.update({"exp": expire, "type": "refresh"}) | |
| encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=ALGORITHM) | |
| return encoded_jwt | |
| def verify_token(token: str, token_type: str = "access") -> Optional[Dict[str, Any]]: | |
| """Verify JWT token and return payload.""" | |
| try: | |
| payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM]) | |
| if payload.get("type") != token_type: | |
| return None | |
| return payload | |
| except JWTError as e: | |
| logger.warning( | |
| "Token verification failed", | |
| extra={"event": "system_user_token_verify_failed", "token_type": token_type, "error": str(e)}, | |
| ) | |
| return None | |
| async def get_user_by_id(self, user_id: str) -> Optional[SystemUserModel]: | |
| """Get user by user_id.""" | |
| try: | |
| user_doc = await self.collection.find_one({"user_id": user_id}) | |
| if user_doc: | |
| return SystemUserModel(**user_doc) | |
| return None | |
| except Exception as e: | |
| logger.error( | |
| "Error getting user by ID", | |
| extra={"event": "system_user_get_by_id_failed", "user_id": user_id, "error": str(e)}, | |
| exc_info=True, | |
| ) | |
| return None | |
| async def get_user_by_username(self, username: str) -> Optional[SystemUserModel]: | |
| """Get user by username.""" | |
| try: | |
| user_doc = await self.collection.find_one({"username": username.lower()}) | |
| if user_doc: | |
| return SystemUserModel(**user_doc) | |
| return None | |
| except Exception as e: | |
| logger.error( | |
| "Error getting user by username", | |
| extra={"event": "system_user_get_by_username_failed", "username": username, "error": str(e)}, | |
| exc_info=True, | |
| ) | |
| return None | |
| async def get_user_by_email(self, email: str, user_id=None) -> Optional[SystemUserModel]: | |
| """Get user by email. If user_id is provided, returns None when the found | |
| document belongs to the same user (not a conflict).""" | |
| try: | |
| user_doc = await self.collection.find_one({"email": email.lower()}) | |
| if not user_doc: | |
| return None | |
| if user_id is not None: | |
| existing_user_id = user_doc.get("user_id") | |
| if existing_user_id and str(existing_user_id) == str(user_id): | |
| return None # same user — not a duplicate | |
| return SystemUserModel(**user_doc) | |
| except Exception as e: | |
| logger.error( | |
| "Error getting user by email", | |
| extra={"event": "system_user_get_by_email_failed", "email": email, "error": str(e)}, | |
| exc_info=True, | |
| ) | |
| return None | |
| async def create_user(self, user_data: CreateUserRequest, created_by: str, user_id=None) -> SystemUserModel: | |
| """Create a new system user in scm_system_users collection.""" | |
| try: | |
| if await self.get_user_by_email(user_data.email, user_id): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Email already exists", | |
| ) | |
| if user_id is None: | |
| user_id = uuid.uuid4() | |
| password_hash = self.get_password_hash(user_data.password) | |
| user_model = SystemUserModel( | |
| user_id=str(user_id), | |
| username=user_data.username.lower(), | |
| email=user_data.email.lower(), | |
| phone=user_data.phone, | |
| password_hash=password_hash, | |
| full_name=user_data.full_name, | |
| role_id=user_data.role_id, | |
| merchant_id=str(user_data.merchant_id), | |
| merchant_type=user_data.merchant_type, | |
| status=UserStatus.ACTIVE, # Set as active by default | |
| last_login=None, | |
| created_by=created_by, | |
| created_at=datetime.utcnow(), | |
| metadata=user_data.metadata, | |
| ) | |
| await self.collection.insert_one(user_model.model_dump()) | |
| logger.info( | |
| "User created successfully", | |
| extra={"event": "system_user_created", "user_id": str(user_id), "created_by": created_by}, | |
| ) | |
| return user_model | |
| except HTTPException as e: | |
| logger.error( | |
| "HTTPException in create_user", | |
| extra={"event": "system_user_create_http_exception", "detail": str(e.detail)}, | |
| ) | |
| raise | |
| except Exception as e: | |
| logger.error( | |
| "Error creating user", | |
| extra={"event": "system_user_create_failed", "error": str(e)}, | |
| exc_info=True, | |
| ) | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to create user", | |
| ) | |
| async def authenticate_user(self, identifier: str, password: str) -> Tuple[Optional[SystemUserModel], str]: | |
| """Authenticate user using email or username and return the user on success.""" | |
| try: | |
| user = await self.get_user_by_email(identifier) | |
| if not user: | |
| user = await self.get_user_by_username(identifier) | |
| if not user: | |
| logger.warning( | |
| "Login attempt with non-existent identifier", | |
| extra={"event": "system_user_auth_identifier_not_found", "identifier": identifier}, | |
| ) | |
| return None, "Invalid email or username" | |
| # Check account status | |
| if user.status not in [UserStatus.ACTIVE]: | |
| return None, f"Account is {user.status.value}" | |
| if not self.verify_password(password, user.password_hash): | |
| return None, "Invalid username or password" | |
| now = datetime.utcnow() | |
| await self.collection.update_one( | |
| {"user_id": user.user_id}, | |
| {"$set": {"last_login": now, "updated_at": now, "updated_by": user.user_id}}, | |
| ) | |
| user.last_login = now | |
| return user, "Authentication successful" | |
| except Exception as e: | |
| logger.error( | |
| "Error during authentication", | |
| extra={"event": "system_user_auth_failed", "identifier": identifier, "error": str(e)}, | |
| exc_info=True, | |
| ) | |
| return None, "Authentication failed" | |
| async def update_user(self, user_id: str, update_data: UpdateUserRequest, updated_by: str) -> Optional[SystemUserModel]: | |
| """Update user information.""" | |
| try: | |
| user = await self.get_user_by_id(user_id) | |
| if not user: | |
| return None | |
| update_dict = update_data.model_dump(exclude_unset=True) | |
| if "email" in update_dict: | |
| update_dict["email"] = update_dict["email"].lower() | |
| if "username" in update_dict: | |
| update_dict["username"] = update_dict["username"].lower() | |
| if update_dict: | |
| update_dict["updated_at"] = datetime.utcnow() | |
| update_dict["updated_by"] = updated_by | |
| await self.collection.update_one( | |
| {"user_id": user_id}, | |
| {"$set": update_dict}, | |
| ) | |
| # Sync phone to employee record if phone was updated | |
| if "phone" in update_dict: | |
| try: | |
| result = await self.db[SCM_EMPLOYEES_COLLECTION].update_one( | |
| {"user_id": user_id}, | |
| {"$set": { | |
| "phone": update_dict["phone"], | |
| "updated_at": datetime.utcnow().isoformat(), | |
| "updated_by": updated_by, | |
| }}, | |
| ) | |
| if result.matched_count > 0: | |
| logger.info( | |
| "Synced phone to employee record", | |
| extra={"event": "employee_phone_sync_success", "user_id": user_id}, | |
| ) | |
| else: | |
| logger.debug( | |
| "No employee record found to sync phone", | |
| extra={"event": "employee_phone_sync_no_match", "user_id": user_id}, | |
| ) | |
| except Exception as sync_err: | |
| logger.error( | |
| "Failed to sync phone to employee record", | |
| extra={"event": "employee_phone_sync_failed", "user_id": user_id, "error": str(sync_err)}, | |
| exc_info=True, | |
| ) | |
| return await self.get_user_by_id(user_id) | |
| except Exception as e: | |
| logger.error( | |
| "Error updating user", | |
| extra={"event": "system_user_update_failed", "user_id": user_id, "error": str(e)}, | |
| exc_info=True, | |
| ) | |
| return None | |
| async def change_password(self, user_id: str, current_password: str, new_password: str) -> bool: | |
| """Change user password.""" | |
| try: | |
| user = await self.get_user_by_id(user_id) | |
| if not user: | |
| return False | |
| # Verify current password | |
| if not self.verify_password(current_password, user.password_hash): | |
| return False | |
| # Hash new password | |
| new_password_hash = self.get_password_hash(new_password) | |
| # Update password | |
| await self.collection.update_one( | |
| {"user_id": user_id}, | |
| {"$set": { | |
| "password_hash": new_password_hash, | |
| "updated_at": datetime.utcnow(), | |
| "updated_by": user_id, | |
| }} | |
| ) | |
| logger.info( | |
| "Password changed for user", | |
| extra={"event": "system_user_password_changed", "user_id": user_id}, | |
| ) | |
| return True | |
| except Exception as e: | |
| logger.error( | |
| "Error changing password for user", | |
| extra={"event": "system_user_password_change_failed", "user_id": user_id, "error": str(e)}, | |
| exc_info=True, | |
| ) | |
| return False | |
| async def list_users(self, page: int = 1, page_size: int = 20, status_filter: Optional[List[UserStatus]] = None, projection_list: Optional[List[str]] = None, current_user=None, roles_filter: Optional[List[str]]=None, merchant_type_filter: Optional[List[str]]=None) -> Tuple[List[Any], int] : | |
| """List users with pagination, optional status filter, and field projection following API standards.""" | |
| try: | |
| skip = (page - 1) * page_size | |
| merchant_id = current_user.merchant_id | |
| # Build query filter | |
| query_filter = {} | |
| if status_filter is not None: | |
| query_filter["status"] = { | |
| "$in": [status.value for status in status_filter] | |
| } | |
| if roles_filter is not None: | |
| query_filter["role_id"] = {"$in": roles_filter} | |
| if merchant_type_filter is not None: | |
| query_filter["merchant_type"] = {"$in": merchant_type_filter} | |
| # Check if user has full system_users access (view+create+update+delete) | |
| # If so, return all records across merchants; otherwise scope to own merchant | |
| has_full_access = False | |
| if current_user.role_id: | |
| from app.access_roles.constants import SCM_ACCESS_ROLES_COLLECTION | |
| role_doc = await self.db[SCM_ACCESS_ROLES_COLLECTION].find_one( | |
| {"role_id": current_user.role_id, "is_active": True}, | |
| {"permissions.system_users": 1, "_id": 0} | |
| ) | |
| if role_doc: | |
| su_perms = role_doc.get("permissions", {}).get("system_users", []) | |
| has_full_access = all(a in su_perms for a in ["view", "create", "update", "delete"]) | |
| if not has_full_access and merchant_id is not None: | |
| query_filter["merchant_id"] = merchant_id | |
| # Debug logging | |
| logger.info( | |
| "list_users called", | |
| extra={ | |
| "event": "system_user_list_called", | |
| "page": page, | |
| "page_size": page_size, | |
| "status_filter": [s.value for s in status_filter] if status_filter else None, | |
| "projection_list": projection_list, | |
| }, | |
| ) | |
| logger.info( | |
| "list_users database context", | |
| extra={"event": "system_user_list_db_context", "database": self.db.name, "collection": self.collection.name}, | |
| ) | |
| logger.info( | |
| "list_users query filter prepared", | |
| extra={"event": "system_user_list_query_filter", "query_filter": query_filter}, | |
| ) | |
| # Get total count | |
| total_count = await self.collection.count_documents(query_filter) | |
| logger.info( | |
| "list_users total count computed", | |
| extra={"event": "system_user_list_total_count", "total_count": total_count}, | |
| ) | |
| # Build MongoDB projection following API standards | |
| projection_dict = None | |
| if projection_list: | |
| projection_dict = {field: 1 for field in projection_list} | |
| projection_dict["_id"] = 0 # Always exclude _id for projection | |
| # Query with projection | |
| cursor = self.collection.find(query_filter, projection_dict).sort("created_at", -1).skip(skip).limit(page_size) | |
| docs = await cursor.to_list(length=page_size) | |
| # Return raw dict if projection, model otherwise (API standard) | |
| if projection_list: | |
| users = docs | |
| else: | |
| users = [SystemUserModel(**doc) for doc in docs] | |
| logger.info( | |
| "list_users returning users", | |
| extra={"event": "system_user_list_return", "returned_count": len(users)}, | |
| ) | |
| return users, total_count | |
| except Exception as e: | |
| logger.error( | |
| "Error listing users", | |
| extra={"event": "system_user_list_failed", "error": str(e)}, | |
| exc_info=True, | |
| ) | |
| return [], 0 | |
| async def deactivate_user(self, user_id: str, deactivated_by: str) -> bool: | |
| """Deactivate user account.""" | |
| try: | |
| result = await self.collection.update_one( | |
| {"user_id": user_id}, | |
| {"$set": { | |
| "status": UserStatus.INACTIVE.value, | |
| "updated_at": datetime.utcnow(), | |
| "updated_by": deactivated_by | |
| }} | |
| ) | |
| if result.modified_count > 0: | |
| logger.info( | |
| "User deactivated", | |
| extra={"event": "system_user_deactivated", "user_id": user_id, "deactivated_by": deactivated_by}, | |
| ) | |
| return True | |
| return False | |
| except Exception as e: | |
| logger.error( | |
| "Error deactivating user", | |
| extra={"event": "system_user_deactivate_failed", "user_id": user_id, "error": str(e)}, | |
| exc_info=True, | |
| ) | |
| return False | |
| async def suspend_user(self, user_id: str, reason: str, suspended_by: str) -> bool: | |
| """Suspend user account with reason.""" | |
| try: | |
| result = await self.collection.update_one( | |
| {"user_id": user_id}, | |
| {"$set": { | |
| "status": UserStatus.SUSPENDED.value, | |
| "suspension_reason": reason, | |
| "suspended_at": datetime.utcnow(), | |
| "suspended_by": suspended_by, | |
| "updated_at": datetime.utcnow(), | |
| "updated_by": suspended_by | |
| }} | |
| ) | |
| if result.modified_count > 0: | |
| logger.info( | |
| "User suspended", | |
| extra={ | |
| "event": "system_user_suspended", | |
| "user_id": user_id, | |
| "suspended_by": suspended_by, | |
| "reason": reason, | |
| }, | |
| ) | |
| return True | |
| return False | |
| except Exception as e: | |
| logger.error( | |
| "Error suspending user", | |
| extra={"event": "system_user_suspend_failed", "user_id": user_id, "error": str(e)}, | |
| exc_info=True, | |
| ) | |
| return False | |
| async def reset_password_admin(self, user_id: str, reset_by: str) -> Optional[str]: | |
| """Admin reset user password. Returns temporary password if successful.""" | |
| try: | |
| user = await self.get_user_by_id(user_id) | |
| if not user: | |
| return None | |
| # Generate 6-digit numeric PIN (works with Fast2SMS OTP route) | |
| temp_password = ''.join([str(secrets.randbelow(10)) for _ in range(6)]) | |
| temp_password_hash = self.get_password_hash(temp_password) | |
| # Update user with temporary password and force password change | |
| result = await self.collection.update_one( | |
| {"user_id": user_id}, | |
| {"$set": { | |
| "password_hash": temp_password_hash, | |
| "must_change_password": True, | |
| "password_reset_at": datetime.utcnow(), | |
| "password_reset_by": reset_by, | |
| "updated_at": datetime.utcnow(), | |
| "updated_by": reset_by | |
| }} | |
| ) | |
| if result.modified_count > 0: | |
| logger.info( | |
| "Password reset for user", | |
| extra={"event": "system_user_password_reset", "user_id": user_id, "reset_by": reset_by}, | |
| ) | |
| # Queue SMS / WhatsApp notification via Redis | |
| if user.phone: | |
| try: | |
| from app.utils.notification_queue import NotificationQueue | |
| merchant_name = "" | |
| if user.merchant_id: | |
| m_doc = await self.db[SCM_MERCHANTS_COLLECTION].find_one( | |
| {"merchant_id": user.merchant_id}, | |
| {"merchant_name": 1, "_id": 0}, | |
| ) | |
| merchant_name = (m_doc or {}).get("merchant_name", "") | |
| await NotificationQueue.send_password_reset( | |
| recipient=user.phone, | |
| name=user.full_name or user.username, | |
| new_password=temp_password, | |
| merchant_id=user.merchant_id or "", | |
| merchant_name=merchant_name, | |
| ) | |
| logger.info( | |
| "Password reset SMS/WhatsApp notification queued", | |
| extra={ | |
| "event": "system_user_password_reset_notification_queued", | |
| "user_id": user_id, | |
| "recipient": user.phone, | |
| }, | |
| ) | |
| except Exception as notif_error: | |
| logger.error( | |
| "Error queuing password reset notification", | |
| extra={ | |
| "event": "system_user_password_reset_notification_queue_failed", | |
| "user_id": user_id, | |
| "error": str(notif_error), | |
| }, | |
| exc_info=True, | |
| ) | |
| return temp_password | |
| return None | |
| except Exception as e: | |
| logger.error( | |
| "Error resetting password for user", | |
| extra={"event": "system_user_password_reset_failed", "user_id": user_id, "error": str(e)}, | |
| exc_info=True, | |
| ) | |
| return None | |
| async def unlock_user(self, user_id: str, unlocked_by: str) -> bool: | |
| """Unlock user account by clearing account lock, failed login attempts, and reactivating the account.""" | |
| try: | |
| result = await self.collection.update_one( | |
| {"user_id": user_id}, | |
| { | |
| "$set": { | |
| "status": UserStatus.ACTIVE.value, | |
| "updated_at": datetime.utcnow(), | |
| "updated_by": unlocked_by | |
| }, | |
| "$unset": { | |
| "account_locked_until": "", | |
| "failed_login_attempts": "", | |
| "suspension_reason": "", | |
| "suspended_at": "", | |
| "suspended_by": "" | |
| } | |
| } | |
| ) | |
| if result.modified_count > 0: | |
| logger.info( | |
| "User unlocked and reactivated", | |
| extra={"event": "system_user_unlocked", "user_id": user_id, "unlocked_by": unlocked_by}, | |
| ) | |
| return True | |
| return False | |
| except Exception as e: | |
| logger.error( | |
| "Error unlocking user", | |
| extra={"event": "system_user_unlock_failed", "user_id": user_id, "error": str(e)}, | |
| exc_info=True, | |
| ) | |
| return False | |
| def convert_to_user_info_response(self, user: SystemUserModel) -> UserInfoResponse: | |
| """Convert SystemUserModel to UserInfoResponse.""" | |
| return UserInfoResponse( | |
| user_id=user.user_id, | |
| username=user.username, | |
| email=user.email, | |
| full_name=user.full_name, | |
| role_id=user.role_id, | |
| merchant_id=user.merchant_id, | |
| merchant_type=user.merchant_type, | |
| status=user.status, | |
| last_login=user.last_login, | |
| created_by=user.created_by, | |
| created_at=user.created_at, | |
| ) | |
| async def convert_to_user_info_response_with_merchant_and_security( | |
| self, user_doc: Dict[str, Any] | |
| ) -> UserInfoResponse: | |
| """ | |
| Convert user document to UserInfoResponse with merchant details and security settings. | |
| Args: | |
| user_doc: User document from MongoDB | |
| Returns: | |
| UserInfoResponse with populated merchant_name, merchant_code, and security_settings | |
| """ | |
| try: | |
| # Initialize merchant info | |
| merchant_name = None | |
| merchant_code = None | |
| # Fetch merchant info if merchant_id exists | |
| if user_doc.get("merchant_id"): | |
| try: | |
| merchant_id = user_doc["merchant_id"] | |
| logger.debug( | |
| "Fetching merchant info", | |
| extra={"event": "system_user_merchant_info_fetch_start", "merchant_id": merchant_id}, | |
| ) | |
| merchant_doc = await self.db[SCM_MERCHANTS_COLLECTION].find_one( | |
| {"merchant_id": merchant_id}, | |
| {"merchant_name": 1, "merchant_code": 1} | |
| ) | |
| if merchant_doc: | |
| merchant_name = merchant_doc.get("merchant_name") | |
| merchant_code = merchant_doc.get("merchant_code") | |
| logger.debug( | |
| "Found merchant", | |
| extra={ | |
| "event": "system_user_merchant_info_found", | |
| "merchant_id": merchant_id, | |
| "merchant_name": merchant_name, | |
| "merchant_code": merchant_code, | |
| }, | |
| ) | |
| else: | |
| logger.warning( | |
| "Merchant not found", | |
| extra={"event": "system_user_merchant_info_not_found", "merchant_id": merchant_id}, | |
| ) | |
| except Exception as e: | |
| logger.warning( | |
| "Error fetching merchant info", | |
| extra={ | |
| "event": "system_user_merchant_info_fetch_failed", | |
| "merchant_id": user_doc.get("merchant_id"), | |
| "error": str(e), | |
| }, | |
| ) | |
| # Extract security settings | |
| security_settings = None | |
| if user_doc.get("security_settings"): | |
| failed_login_attempts = user_doc["security_settings"].get("failed_login_attempts", 0) | |
| security_settings = SecuritySettings(failed_login_attempts=failed_login_attempts) | |
| return UserInfoResponse( | |
| user_id=user_doc.get("user_id"), | |
| username=user_doc.get("username"), | |
| email=user_doc.get("email"), | |
| full_name=user_doc.get("full_name"), | |
| role_id=user_doc.get("role_id"), | |
| merchant_id=user_doc.get("merchant_id"), | |
| merchant_type=user_doc.get("merchant_type"), | |
| merchant_name=merchant_name, | |
| merchant_code=merchant_code, | |
| status=user_doc.get("status", UserStatus.ACTIVE), | |
| security_settings=security_settings, | |
| last_login=user_doc.get("last_login"), | |
| created_by=user_doc.get("created_by"), | |
| created_at=user_doc.get("created_at"), | |
| ) | |
| except Exception as e: | |
| logger.error( | |
| "Error converting user document to UserInfoResponse", | |
| extra={"event": "system_user_response_convert_failed", "error": str(e)}, | |
| exc_info=True, | |
| ) | |
| # Fallback to basic conversion without merchant and security info | |
| return UserInfoResponse( | |
| user_id=user_doc.get("user_id"), | |
| username=user_doc.get("username"), | |
| email=user_doc.get("email"), | |
| full_name=user_doc.get("full_name"), | |
| role_id=user_doc.get("role_id"), | |
| merchant_id=user_doc.get("merchant_id"), | |
| merchant_type=user_doc.get("merchant_type"), | |
| status=user_doc.get("status", UserStatus.ACTIVE), | |
| last_login=user_doc.get("last_login"), | |
| created_by=user_doc.get("created_by"), | |
| created_at=user_doc.get("created_at"), | |
| ) | |
| async def get_all_roles(self) -> List[dict]: | |
| """Get all access roles from database.""" | |
| try: | |
| cursor = self.db[SCM_ACCESS_ROLES_COLLECTION].find({}) | |
| roles = await cursor.to_list(length=None) | |
| return roles | |
| except Exception as e: | |
| logger.error( | |
| "Error fetching access roles", | |
| extra={"event": "system_user_access_roles_fetch_failed", "error": str(e)}, | |
| exc_info=True, | |
| ) | |
| return [] | |
| async def get_current_user_profile(self, user_id: str) -> Optional[Dict[str, any]]: | |
| """ | |
| Get current user profile with merchant details for GET /me endpoint. | |
| Args: | |
| user_id: User ID from JWT token | |
| Returns: | |
| Dictionary with user profile and merchant info, or None if not found | |
| """ | |
| try: | |
| # Fetch user document | |
| user_doc = await self.collection.find_one({"user_id": user_id}) | |
| if not user_doc: | |
| logger.warning( | |
| "User not found for profile fetch", | |
| extra={"event": "system_user_profile_not_found", "user_id": user_id}, | |
| ) | |
| return None | |
| # Initialize merchant info | |
| merchant_name = None | |
| merchant_code = None | |
| # Fetch merchant info if merchant_id exists | |
| if user_doc.get("merchant_id"): | |
| try: | |
| merchant_id = user_doc["merchant_id"] | |
| merchant_doc = await self.db[SCM_MERCHANTS_COLLECTION].find_one( | |
| {"merchant_id": merchant_id}, | |
| {"merchant_name": 1, "merchant_code": 1} | |
| ) | |
| if merchant_doc: | |
| merchant_name = merchant_doc.get("merchant_name") | |
| merchant_code = merchant_doc.get("merchant_code") | |
| logger.debug( | |
| "Found merchant", | |
| extra={ | |
| "event": "system_user_profile_merchant_found", | |
| "merchant_id": merchant_id, | |
| "merchant_name": merchant_name, | |
| "merchant_code": merchant_code, | |
| }, | |
| ) | |
| else: | |
| logger.warning( | |
| "Merchant not found for profile fetch", | |
| extra={"event": "system_user_profile_merchant_not_found", "merchant_id": merchant_id}, | |
| ) | |
| except Exception as e: | |
| logger.warning( | |
| "Error fetching merchant info", | |
| extra={"event": "system_user_profile_merchant_fetch_failed", "merchant_id": user_doc.get("merchant_id"), "error": str(e)}, | |
| ) | |
| # Parse full_name into firstName and lastName | |
| full_name = user_doc.get("full_name", "") | |
| first_name = None | |
| last_name = None | |
| if full_name: | |
| name_parts = full_name.strip().split(None, 1) # Split on first whitespace | |
| first_name = name_parts[0] if len(name_parts) > 0 else None | |
| last_name = name_parts[1] if len(name_parts) > 1 else None | |
| # Build response | |
| profile = { | |
| "id": user_doc.get("user_id"), | |
| "email": user_doc.get("email"), | |
| "firstName": first_name, | |
| "lastName": last_name, | |
| "role": user_doc.get("role_id"), | |
| "merchantId": user_doc.get("merchant_id"), | |
| "merchantName": merchant_name, | |
| "merchantCode": merchant_code, | |
| "merchantType": user_doc.get("merchant_type"), | |
| "username": user_doc.get("username"), | |
| "status": user_doc.get("status"), | |
| "lastLogin": user_doc.get("last_login"), | |
| "photoUrl": None, | |
| "phone": None, | |
| "backgroundTrackingOptIn": None, | |
| "periodInMins": None, | |
| } | |
| # Enrich with employee data from scm_employees | |
| try: | |
| employee_doc = await self.db["scm_employees"].find_one( | |
| {"user_id": user_id}, | |
| { | |
| "photo_url": 1, | |
| "phone": 1, | |
| "location_settings.background_tracking_opt_in": 1, | |
| "location_settings.period_in_mins": 1, | |
| } | |
| ) | |
| if employee_doc: | |
| profile["photoUrl"] = employee_doc.get("photo_url") | |
| profile["phone"] = employee_doc.get("phone") | |
| loc = employee_doc.get("location_settings", {}) | |
| profile["backgroundTrackingOptIn"] = loc.get("background_tracking_opt_in") | |
| profile["periodInMins"] = loc.get("period_in_mins") | |
| except Exception as e: | |
| logger.warning( | |
| "Could not fetch employee data for user profile", | |
| extra={"event": "system_user_profile_employee_fetch_failed", "user_id": user_id, "error": str(e)}, | |
| ) | |
| logger.info( | |
| "Profile fetched successfully for user", | |
| extra={"event": "system_user_profile_fetched", "user_id": user_id}, | |
| ) | |
| return profile | |
| except Exception as e: | |
| logger.error( | |
| "Error fetching user profile", | |
| extra={"event": "system_user_profile_fetch_failed", "user_id": user_id, "error": str(e)}, | |
| exc_info=True, | |
| ) | |
| return None | |