MukeshKapoor25's picture
refactor(dependencies): Extract TokenUser model to separate module
9f3256f
"""
System User service for authentication and user management.
"""
import secrets
from datetime import datetime, timedelta
from typing import Optional, List, Tuple, Dict, Any
import uuid
from motor.motor_asyncio import AsyncIOMotorDatabase
from passlib.context import CryptContext
from jose import JWTError, jwt
from fastapi import HTTPException, status
from app.core.logging import get_logger
from app.system_users.models.model import SystemUserModel, UserStatus
from app.system_users.schemas.schema import (
CreateUserRequest,
UpdateUserRequest,
ChangePasswordRequest,
UserInfoResponse,
SecuritySettings,
)
from app.system_users.constants import SCM_SYSTEM_USERS_COLLECTION
from app.access_roles.constants import SCM_ACCESS_ROLES_COLLECTION
from app.merchants.constants import SCM_MERCHANTS_COLLECTION
from app.employees.constants import SCM_EMPLOYEES_COLLECTION
from app.core.config import settings
logger = get_logger(__name__)
# Password hashing context
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# JWT settings
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7
class SystemUserService:
"""Service class for system user operations."""
def __init__(self, db: AsyncIOMotorDatabase):
self.db = db
self.collection = db[SCM_SYSTEM_USERS_COLLECTION]
@staticmethod
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash."""
try:
return pwd_context.verify(plain_password, hashed_password)
except Exception as e:
logger.error(
"Error verifying password",
extra={"event": "system_user_password_verify_failed", "error": str(e)},
exc_info=True,
)
return False
@staticmethod
def get_password_hash(password: str) -> str:
"""Generate password hash."""
return pwd_context.hash(password)
@staticmethod
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create JWT access token."""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire, "type": "access"})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
@staticmethod
def create_refresh_token(data: dict) -> str:
"""Create JWT refresh token."""
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
to_encode.update({"exp": expire, "type": "refresh"})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
@staticmethod
def verify_token(token: str, token_type: str = "access") -> Optional[Dict[str, Any]]:
"""Verify JWT token and return payload."""
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM])
if payload.get("type") != token_type:
return None
return payload
except JWTError as e:
logger.warning(
"Token verification failed",
extra={"event": "system_user_token_verify_failed", "token_type": token_type, "error": str(e)},
)
return None
async def get_user_by_id(self, user_id: str) -> Optional[SystemUserModel]:
"""Get user by user_id."""
try:
user_doc = await self.collection.find_one({"user_id": user_id})
if user_doc:
return SystemUserModel(**user_doc)
return None
except Exception as e:
logger.error(
"Error getting user by ID",
extra={"event": "system_user_get_by_id_failed", "user_id": user_id, "error": str(e)},
exc_info=True,
)
return None
async def get_user_by_username(self, username: str) -> Optional[SystemUserModel]:
"""Get user by username."""
try:
user_doc = await self.collection.find_one({"username": username.lower()})
if user_doc:
return SystemUserModel(**user_doc)
return None
except Exception as e:
logger.error(
"Error getting user by username",
extra={"event": "system_user_get_by_username_failed", "username": username, "error": str(e)},
exc_info=True,
)
return None
async def get_user_by_email(self, email: str, user_id=None) -> Optional[SystemUserModel]:
"""Get user by email. If user_id is provided, returns None when the found
document belongs to the same user (not a conflict)."""
try:
user_doc = await self.collection.find_one({"email": email.lower()})
if not user_doc:
return None
if user_id is not None:
existing_user_id = user_doc.get("user_id")
if existing_user_id and str(existing_user_id) == str(user_id):
return None # same user — not a duplicate
return SystemUserModel(**user_doc)
except Exception as e:
logger.error(
"Error getting user by email",
extra={"event": "system_user_get_by_email_failed", "email": email, "error": str(e)},
exc_info=True,
)
return None
async def create_user(self, user_data: CreateUserRequest, created_by: str, user_id=None) -> SystemUserModel:
"""Create a new system user in scm_system_users collection."""
try:
if await self.get_user_by_email(user_data.email, user_id):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already exists",
)
if user_id is None:
user_id = uuid.uuid4()
password_hash = self.get_password_hash(user_data.password)
user_model = SystemUserModel(
user_id=str(user_id),
username=user_data.username.lower(),
email=user_data.email.lower(),
phone=user_data.phone,
password_hash=password_hash,
full_name=user_data.full_name,
role_id=user_data.role_id,
merchant_id=str(user_data.merchant_id),
merchant_type=user_data.merchant_type,
status=UserStatus.ACTIVE, # Set as active by default
last_login=None,
created_by=created_by,
created_at=datetime.utcnow(),
metadata=user_data.metadata,
)
await self.collection.insert_one(user_model.model_dump())
logger.info(
"User created successfully",
extra={"event": "system_user_created", "user_id": str(user_id), "created_by": created_by},
)
return user_model
except HTTPException as e:
logger.error(
"HTTPException in create_user",
extra={"event": "system_user_create_http_exception", "detail": str(e.detail)},
)
raise
except Exception as e:
logger.error(
"Error creating user",
extra={"event": "system_user_create_failed", "error": str(e)},
exc_info=True,
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to create user",
)
async def authenticate_user(self, identifier: str, password: str) -> Tuple[Optional[SystemUserModel], str]:
"""Authenticate user using email or username and return the user on success."""
try:
user = await self.get_user_by_email(identifier)
if not user:
user = await self.get_user_by_username(identifier)
if not user:
logger.warning(
"Login attempt with non-existent identifier",
extra={"event": "system_user_auth_identifier_not_found", "identifier": identifier},
)
return None, "Invalid email or username"
# Check account status
if user.status not in [UserStatus.ACTIVE]:
return None, f"Account is {user.status.value}"
if not self.verify_password(password, user.password_hash):
return None, "Invalid username or password"
now = datetime.utcnow()
await self.collection.update_one(
{"user_id": user.user_id},
{"$set": {"last_login": now, "updated_at": now, "updated_by": user.user_id}},
)
user.last_login = now
return user, "Authentication successful"
except Exception as e:
logger.error(
"Error during authentication",
extra={"event": "system_user_auth_failed", "identifier": identifier, "error": str(e)},
exc_info=True,
)
return None, "Authentication failed"
async def update_user(self, user_id: str, update_data: UpdateUserRequest, updated_by: str) -> Optional[SystemUserModel]:
"""Update user information."""
try:
user = await self.get_user_by_id(user_id)
if not user:
return None
update_dict = update_data.model_dump(exclude_unset=True)
if "email" in update_dict:
update_dict["email"] = update_dict["email"].lower()
if "username" in update_dict:
update_dict["username"] = update_dict["username"].lower()
if update_dict:
update_dict["updated_at"] = datetime.utcnow()
update_dict["updated_by"] = updated_by
await self.collection.update_one(
{"user_id": user_id},
{"$set": update_dict},
)
# Sync phone to employee record if phone was updated
if "phone" in update_dict:
try:
result = await self.db[SCM_EMPLOYEES_COLLECTION].update_one(
{"user_id": user_id},
{"$set": {
"phone": update_dict["phone"],
"updated_at": datetime.utcnow().isoformat(),
"updated_by": updated_by,
}},
)
if result.matched_count > 0:
logger.info(
"Synced phone to employee record",
extra={"event": "employee_phone_sync_success", "user_id": user_id},
)
else:
logger.debug(
"No employee record found to sync phone",
extra={"event": "employee_phone_sync_no_match", "user_id": user_id},
)
except Exception as sync_err:
logger.error(
"Failed to sync phone to employee record",
extra={"event": "employee_phone_sync_failed", "user_id": user_id, "error": str(sync_err)},
exc_info=True,
)
return await self.get_user_by_id(user_id)
except Exception as e:
logger.error(
"Error updating user",
extra={"event": "system_user_update_failed", "user_id": user_id, "error": str(e)},
exc_info=True,
)
return None
async def change_password(self, user_id: str, current_password: str, new_password: str) -> bool:
"""Change user password."""
try:
user = await self.get_user_by_id(user_id)
if not user:
return False
# Verify current password
if not self.verify_password(current_password, user.password_hash):
return False
# Hash new password
new_password_hash = self.get_password_hash(new_password)
# Update password
await self.collection.update_one(
{"user_id": user_id},
{"$set": {
"password_hash": new_password_hash,
"updated_at": datetime.utcnow(),
"updated_by": user_id,
}}
)
logger.info(
"Password changed for user",
extra={"event": "system_user_password_changed", "user_id": user_id},
)
return True
except Exception as e:
logger.error(
"Error changing password for user",
extra={"event": "system_user_password_change_failed", "user_id": user_id, "error": str(e)},
exc_info=True,
)
return False
async def list_users(self, page: int = 1, page_size: int = 20, status_filter: Optional[List[UserStatus]] = None, projection_list: Optional[List[str]] = None, current_user=None, roles_filter: Optional[List[str]]=None, merchant_type_filter: Optional[List[str]]=None) -> Tuple[List[Any], int] :
"""List users with pagination, optional status filter, and field projection following API standards."""
try:
skip = (page - 1) * page_size
merchant_id = current_user.merchant_id
# Build query filter
query_filter = {}
if status_filter is not None:
query_filter["status"] = {
"$in": [status.value for status in status_filter]
}
if roles_filter is not None:
query_filter["role_id"] = {"$in": roles_filter}
if merchant_type_filter is not None:
query_filter["merchant_type"] = {"$in": merchant_type_filter}
# Check if user has full system_users access (view+create+update+delete)
# If so, return all records across merchants; otherwise scope to own merchant
has_full_access = False
if current_user.role_id:
from app.access_roles.constants import SCM_ACCESS_ROLES_COLLECTION
role_doc = await self.db[SCM_ACCESS_ROLES_COLLECTION].find_one(
{"role_id": current_user.role_id, "is_active": True},
{"permissions.system_users": 1, "_id": 0}
)
if role_doc:
su_perms = role_doc.get("permissions", {}).get("system_users", [])
has_full_access = all(a in su_perms for a in ["view", "create", "update", "delete"])
if not has_full_access and merchant_id is not None:
query_filter["merchant_id"] = merchant_id
# Debug logging
logger.info(
"list_users called",
extra={
"event": "system_user_list_called",
"page": page,
"page_size": page_size,
"status_filter": [s.value for s in status_filter] if status_filter else None,
"projection_list": projection_list,
},
)
logger.info(
"list_users database context",
extra={"event": "system_user_list_db_context", "database": self.db.name, "collection": self.collection.name},
)
logger.info(
"list_users query filter prepared",
extra={"event": "system_user_list_query_filter", "query_filter": query_filter},
)
# Get total count
total_count = await self.collection.count_documents(query_filter)
logger.info(
"list_users total count computed",
extra={"event": "system_user_list_total_count", "total_count": total_count},
)
# Build MongoDB projection following API standards
projection_dict = None
if projection_list:
projection_dict = {field: 1 for field in projection_list}
projection_dict["_id"] = 0 # Always exclude _id for projection
# Query with projection
cursor = self.collection.find(query_filter, projection_dict).sort("created_at", -1).skip(skip).limit(page_size)
docs = await cursor.to_list(length=page_size)
# Return raw dict if projection, model otherwise (API standard)
if projection_list:
users = docs
else:
users = [SystemUserModel(**doc) for doc in docs]
logger.info(
"list_users returning users",
extra={"event": "system_user_list_return", "returned_count": len(users)},
)
return users, total_count
except Exception as e:
logger.error(
"Error listing users",
extra={"event": "system_user_list_failed", "error": str(e)},
exc_info=True,
)
return [], 0
async def deactivate_user(self, user_id: str, deactivated_by: str) -> bool:
"""Deactivate user account."""
try:
result = await self.collection.update_one(
{"user_id": user_id},
{"$set": {
"status": UserStatus.INACTIVE.value,
"updated_at": datetime.utcnow(),
"updated_by": deactivated_by
}}
)
if result.modified_count > 0:
logger.info(
"User deactivated",
extra={"event": "system_user_deactivated", "user_id": user_id, "deactivated_by": deactivated_by},
)
return True
return False
except Exception as e:
logger.error(
"Error deactivating user",
extra={"event": "system_user_deactivate_failed", "user_id": user_id, "error": str(e)},
exc_info=True,
)
return False
async def suspend_user(self, user_id: str, reason: str, suspended_by: str) -> bool:
"""Suspend user account with reason."""
try:
result = await self.collection.update_one(
{"user_id": user_id},
{"$set": {
"status": UserStatus.SUSPENDED.value,
"suspension_reason": reason,
"suspended_at": datetime.utcnow(),
"suspended_by": suspended_by,
"updated_at": datetime.utcnow(),
"updated_by": suspended_by
}}
)
if result.modified_count > 0:
logger.info(
"User suspended",
extra={
"event": "system_user_suspended",
"user_id": user_id,
"suspended_by": suspended_by,
"reason": reason,
},
)
return True
return False
except Exception as e:
logger.error(
"Error suspending user",
extra={"event": "system_user_suspend_failed", "user_id": user_id, "error": str(e)},
exc_info=True,
)
return False
async def reset_password_admin(self, user_id: str, reset_by: str) -> Optional[str]:
"""Admin reset user password. Returns temporary password if successful."""
try:
user = await self.get_user_by_id(user_id)
if not user:
return None
# Generate 6-digit numeric PIN (works with Fast2SMS OTP route)
temp_password = ''.join([str(secrets.randbelow(10)) for _ in range(6)])
temp_password_hash = self.get_password_hash(temp_password)
# Update user with temporary password and force password change
result = await self.collection.update_one(
{"user_id": user_id},
{"$set": {
"password_hash": temp_password_hash,
"must_change_password": True,
"password_reset_at": datetime.utcnow(),
"password_reset_by": reset_by,
"updated_at": datetime.utcnow(),
"updated_by": reset_by
}}
)
if result.modified_count > 0:
logger.info(
"Password reset for user",
extra={"event": "system_user_password_reset", "user_id": user_id, "reset_by": reset_by},
)
# Queue SMS / WhatsApp notification via Redis
if user.phone:
try:
from app.utils.notification_queue import NotificationQueue
merchant_name = ""
if user.merchant_id:
m_doc = await self.db[SCM_MERCHANTS_COLLECTION].find_one(
{"merchant_id": user.merchant_id},
{"merchant_name": 1, "_id": 0},
)
merchant_name = (m_doc or {}).get("merchant_name", "")
await NotificationQueue.send_password_reset(
recipient=user.phone,
name=user.full_name or user.username,
new_password=temp_password,
merchant_id=user.merchant_id or "",
merchant_name=merchant_name,
)
logger.info(
"Password reset SMS/WhatsApp notification queued",
extra={
"event": "system_user_password_reset_notification_queued",
"user_id": user_id,
"recipient": user.phone,
},
)
except Exception as notif_error:
logger.error(
"Error queuing password reset notification",
extra={
"event": "system_user_password_reset_notification_queue_failed",
"user_id": user_id,
"error": str(notif_error),
},
exc_info=True,
)
return temp_password
return None
except Exception as e:
logger.error(
"Error resetting password for user",
extra={"event": "system_user_password_reset_failed", "user_id": user_id, "error": str(e)},
exc_info=True,
)
return None
async def unlock_user(self, user_id: str, unlocked_by: str) -> bool:
"""Unlock user account by clearing account lock, failed login attempts, and reactivating the account."""
try:
result = await self.collection.update_one(
{"user_id": user_id},
{
"$set": {
"status": UserStatus.ACTIVE.value,
"updated_at": datetime.utcnow(),
"updated_by": unlocked_by
},
"$unset": {
"account_locked_until": "",
"failed_login_attempts": "",
"suspension_reason": "",
"suspended_at": "",
"suspended_by": ""
}
}
)
if result.modified_count > 0:
logger.info(
"User unlocked and reactivated",
extra={"event": "system_user_unlocked", "user_id": user_id, "unlocked_by": unlocked_by},
)
return True
return False
except Exception as e:
logger.error(
"Error unlocking user",
extra={"event": "system_user_unlock_failed", "user_id": user_id, "error": str(e)},
exc_info=True,
)
return False
def convert_to_user_info_response(self, user: SystemUserModel) -> UserInfoResponse:
"""Convert SystemUserModel to UserInfoResponse."""
return UserInfoResponse(
user_id=user.user_id,
username=user.username,
email=user.email,
full_name=user.full_name,
role_id=user.role_id,
merchant_id=user.merchant_id,
merchant_type=user.merchant_type,
status=user.status,
last_login=user.last_login,
created_by=user.created_by,
created_at=user.created_at,
)
async def convert_to_user_info_response_with_merchant_and_security(
self, user_doc: Dict[str, Any]
) -> UserInfoResponse:
"""
Convert user document to UserInfoResponse with merchant details and security settings.
Args:
user_doc: User document from MongoDB
Returns:
UserInfoResponse with populated merchant_name, merchant_code, and security_settings
"""
try:
# Initialize merchant info
merchant_name = None
merchant_code = None
# Fetch merchant info if merchant_id exists
if user_doc.get("merchant_id"):
try:
merchant_id = user_doc["merchant_id"]
logger.debug(
"Fetching merchant info",
extra={"event": "system_user_merchant_info_fetch_start", "merchant_id": merchant_id},
)
merchant_doc = await self.db[SCM_MERCHANTS_COLLECTION].find_one(
{"merchant_id": merchant_id},
{"merchant_name": 1, "merchant_code": 1}
)
if merchant_doc:
merchant_name = merchant_doc.get("merchant_name")
merchant_code = merchant_doc.get("merchant_code")
logger.debug(
"Found merchant",
extra={
"event": "system_user_merchant_info_found",
"merchant_id": merchant_id,
"merchant_name": merchant_name,
"merchant_code": merchant_code,
},
)
else:
logger.warning(
"Merchant not found",
extra={"event": "system_user_merchant_info_not_found", "merchant_id": merchant_id},
)
except Exception as e:
logger.warning(
"Error fetching merchant info",
extra={
"event": "system_user_merchant_info_fetch_failed",
"merchant_id": user_doc.get("merchant_id"),
"error": str(e),
},
)
# Extract security settings
security_settings = None
if user_doc.get("security_settings"):
failed_login_attempts = user_doc["security_settings"].get("failed_login_attempts", 0)
security_settings = SecuritySettings(failed_login_attempts=failed_login_attempts)
return UserInfoResponse(
user_id=user_doc.get("user_id"),
username=user_doc.get("username"),
email=user_doc.get("email"),
full_name=user_doc.get("full_name"),
role_id=user_doc.get("role_id"),
merchant_id=user_doc.get("merchant_id"),
merchant_type=user_doc.get("merchant_type"),
merchant_name=merchant_name,
merchant_code=merchant_code,
status=user_doc.get("status", UserStatus.ACTIVE),
security_settings=security_settings,
last_login=user_doc.get("last_login"),
created_by=user_doc.get("created_by"),
created_at=user_doc.get("created_at"),
)
except Exception as e:
logger.error(
"Error converting user document to UserInfoResponse",
extra={"event": "system_user_response_convert_failed", "error": str(e)},
exc_info=True,
)
# Fallback to basic conversion without merchant and security info
return UserInfoResponse(
user_id=user_doc.get("user_id"),
username=user_doc.get("username"),
email=user_doc.get("email"),
full_name=user_doc.get("full_name"),
role_id=user_doc.get("role_id"),
merchant_id=user_doc.get("merchant_id"),
merchant_type=user_doc.get("merchant_type"),
status=user_doc.get("status", UserStatus.ACTIVE),
last_login=user_doc.get("last_login"),
created_by=user_doc.get("created_by"),
created_at=user_doc.get("created_at"),
)
async def get_all_roles(self) -> List[dict]:
"""Get all access roles from database."""
try:
cursor = self.db[SCM_ACCESS_ROLES_COLLECTION].find({})
roles = await cursor.to_list(length=None)
return roles
except Exception as e:
logger.error(
"Error fetching access roles",
extra={"event": "system_user_access_roles_fetch_failed", "error": str(e)},
exc_info=True,
)
return []
async def get_current_user_profile(self, user_id: str) -> Optional[Dict[str, any]]:
"""
Get current user profile with merchant details for GET /me endpoint.
Args:
user_id: User ID from JWT token
Returns:
Dictionary with user profile and merchant info, or None if not found
"""
try:
# Fetch user document
user_doc = await self.collection.find_one({"user_id": user_id})
if not user_doc:
logger.warning(
"User not found for profile fetch",
extra={"event": "system_user_profile_not_found", "user_id": user_id},
)
return None
# Initialize merchant info
merchant_name = None
merchant_code = None
# Fetch merchant info if merchant_id exists
if user_doc.get("merchant_id"):
try:
merchant_id = user_doc["merchant_id"]
merchant_doc = await self.db[SCM_MERCHANTS_COLLECTION].find_one(
{"merchant_id": merchant_id},
{"merchant_name": 1, "merchant_code": 1}
)
if merchant_doc:
merchant_name = merchant_doc.get("merchant_name")
merchant_code = merchant_doc.get("merchant_code")
logger.debug(
"Found merchant",
extra={
"event": "system_user_profile_merchant_found",
"merchant_id": merchant_id,
"merchant_name": merchant_name,
"merchant_code": merchant_code,
},
)
else:
logger.warning(
"Merchant not found for profile fetch",
extra={"event": "system_user_profile_merchant_not_found", "merchant_id": merchant_id},
)
except Exception as e:
logger.warning(
"Error fetching merchant info",
extra={"event": "system_user_profile_merchant_fetch_failed", "merchant_id": user_doc.get("merchant_id"), "error": str(e)},
)
# Parse full_name into firstName and lastName
full_name = user_doc.get("full_name", "")
first_name = None
last_name = None
if full_name:
name_parts = full_name.strip().split(None, 1) # Split on first whitespace
first_name = name_parts[0] if len(name_parts) > 0 else None
last_name = name_parts[1] if len(name_parts) > 1 else None
# Build response
profile = {
"id": user_doc.get("user_id"),
"email": user_doc.get("email"),
"firstName": first_name,
"lastName": last_name,
"role": user_doc.get("role_id"),
"merchantId": user_doc.get("merchant_id"),
"merchantName": merchant_name,
"merchantCode": merchant_code,
"merchantType": user_doc.get("merchant_type"),
"username": user_doc.get("username"),
"status": user_doc.get("status"),
"lastLogin": user_doc.get("last_login"),
"photoUrl": None,
"phone": None,
"backgroundTrackingOptIn": None,
"periodInMins": None,
}
# Enrich with employee data from scm_employees
try:
employee_doc = await self.db["scm_employees"].find_one(
{"user_id": user_id},
{
"photo_url": 1,
"phone": 1,
"location_settings.background_tracking_opt_in": 1,
"location_settings.period_in_mins": 1,
}
)
if employee_doc:
profile["photoUrl"] = employee_doc.get("photo_url")
profile["phone"] = employee_doc.get("phone")
loc = employee_doc.get("location_settings", {})
profile["backgroundTrackingOptIn"] = loc.get("background_tracking_opt_in")
profile["periodInMins"] = loc.get("period_in_mins")
except Exception as e:
logger.warning(
"Could not fetch employee data for user profile",
extra={"event": "system_user_profile_employee_fetch_failed", "user_id": user_id, "error": str(e)},
)
logger.info(
"Profile fetched successfully for user",
extra={"event": "system_user_profile_fetched", "user_id": user_id},
)
return profile
except Exception as e:
logger.error(
"Error fetching user profile",
extra={"event": "system_user_profile_fetch_failed", "user_id": user_id, "error": str(e)},
exc_info=True,
)
return None