vanitha
(System_user)-updated last_login_at in change_password()
e642846
"""
System User service for authentication and user management.
"""
import secrets
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any, Tuple
from uuid import UUID
import uuid
from motor.motor_asyncio import AsyncIOMotorDatabase
from passlib.context import CryptContext
from jose import JWTError, jwt
from fastapi import HTTPException, status
import aiohttp
from app.system_users.models.model import (
SystemUserModel,
UserStatus,
UserRole,
LoginAttemptModel,
SecuritySettingsModel
)
from app.system_users.schemas.schema import (
CreateUserRequest,
UpdateUserRequest,
ChangePasswordRequest,
UserInfoResponse
)
from app.constants.collections import AUTH_SYSTEM_USERS_COLLECTION, AUTH_ACCESS_ROLES_COLLECTION, SCM_ACCESS_ROLES_COLLECTION
from app.core.config import settings
from app.core.logging import get_logger
from app.utils.email_service import email_service
logger = get_logger(__name__)
# Password hashing context
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# JWT settings - using settings from config
class SystemUserService:
"""Service class for system user operations."""
def __init__(self, db: AsyncIOMotorDatabase):
self.db = db
self.collection = db[AUTH_SYSTEM_USERS_COLLECTION]
async def get_merchant_info(self, merchant_id: str) -> Optional[Dict[str, Any]]:
"""Get merchant information from SCM database."""
try:
# Try to get from SCM database directly
# Assuming both services use the same MongoDB instance but different databases
scm_db_name = "cuatrolabs_scm" # SCM database name
scm_db = self.collection.database.client.get_database(scm_db_name)
scm_merchants_collection = scm_db["scm_merchants"]
merchant = await scm_merchants_collection.find_one(
{"merchant_id": merchant_id},
{"merchant_id": 1, "merchant_type": 1, "merchant_name": 1}
)
if merchant:
logger.info(f"Found merchant {merchant_id} with type {merchant.get('merchant_type')}")
return merchant
else:
logger.warning(f"Merchant {merchant_id} not found in SCM database")
return None
except Exception as e:
logger.warning(f"Error fetching merchant info from SCM database: {e}")
return None
@staticmethod
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash."""
try:
return pwd_context.verify(plain_password, hashed_password)
except Exception as e:
logger.error(f"Error verifying password: {e}")
return False
@staticmethod
def get_password_hash(password: str) -> str:
"""Generate password hash."""
return pwd_context.hash(password)
@staticmethod
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create JWT access token."""
to_encode = data.copy()
# Convert UUID objects to strings for JSON serialization
for key, value in to_encode.items():
if isinstance(value, UUID):
to_encode[key] = str(value)
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(hours=settings.TOKEN_EXPIRATION_HOURS)
# Only set type to "access" if not already specified in data
if "type" not in to_encode:
to_encode["type"] = "access"
to_encode["exp"] = expire
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
@staticmethod
def create_refresh_token(data: dict) -> str:
"""Create JWT refresh token."""
to_encode = data.copy()
# Convert UUID objects to strings for JSON serialization
for key, value in to_encode.items():
if isinstance(value, UUID):
to_encode[key] = str(value)
expire = datetime.utcnow() + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
to_encode.update({"exp": expire, "type": "refresh"})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
@staticmethod
def verify_token(token: str, token_type: str = "access") -> Optional[Dict[str, Any]]:
"""Verify JWT token and return payload."""
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
if payload.get("type") != token_type:
return None
return payload
except JWTError as e:
logger.warning(f"Token verification failed: {e}")
return None
async def get_user_by_id(self, user_id: UUID) -> Optional[SystemUserModel]:
"""Get user by user_id."""
try:
user_doc = await self.collection.find_one({"user_id": str(user_id)})
if user_doc:
return SystemUserModel(**user_doc)
return None
except Exception as e:
logger.error(f"Error getting user by ID {user_id}: {e}")
return None
async def get_user_by_username(self, username: str) -> Optional[SystemUserModel]:
"""Get user by username."""
try:
user_doc = await self.collection.find_one({"username": username.lower()})
if user_doc:
return SystemUserModel(**user_doc)
return None
except Exception as e:
logger.error(f"Error getting user by username {username}: {e}")
return None
async def get_user_by_email(self, email: str) -> Optional[SystemUserModel]:
"""Get user by email."""
try:
user_doc = await self.collection.find_one({"email": email.lower()})
if user_doc:
return SystemUserModel(**user_doc)
return None
except Exception as e:
logger.error(f"Error getting user by email {email}: {e}")
return None
async def get_user_by_phone(self, phone: str) -> Optional[SystemUserModel]:
"""Get user by phone number."""
try:
user_doc = await self.collection.find_one({"phone": phone})
if user_doc:
return SystemUserModel(**user_doc)
return None
except Exception as e:
logger.error(f"Error getting user by phone {phone}: {e}")
return None
async def create_user(self, user_data: CreateUserRequest, created_by: str) -> SystemUserModel:
"""Create a new user."""
try:
# Check if username or email already exists
existing_user = await self.get_user_by_username(user_data.username)
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already exists"
)
existing_email = await self.get_user_by_email(user_data.email)
if existing_email:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already exists"
)
# Generate user ID
# user_id = f"usr_{secrets.token_urlsafe(16)}"
user_id=str(uuid.uuid4())
# Hash password
password_hash = self.get_password_hash(user_data.password)
# Get merchant_type if not provided
merchant_type = user_data.merchant_type
if not merchant_type and user_data.merchant_id:
merchant_info = await self.get_merchant_info(user_data.merchant_id)
if merchant_info:
merchant_type = merchant_info.get('merchant_type')
logger.info(f"Fetched merchant_type '{merchant_type}' for merchant_id '{user_data.merchant_id}'")
# Create user model
user_model = SystemUserModel(
user_id=str(user_id),
username=user_data.username.lower(),
email=user_data.email.lower(),
merchant_id=user_data.merchant_id,
merchant_type=merchant_type,
password_hash=password_hash,
first_name=user_data.first_name,
last_name=user_data.last_name,
phone=user_data.phone,
role=user_data.role,
permissions=user_data.permissions,
status=UserStatus.ACTIVE, # Set as active by default
created_by=str(created_by),
created_at=datetime.utcnow()
)
doc = user_model.model_dump(by_alias=True, exclude_none=True)
# safety: convert UUID -> str if any slips through
for k, v in doc.items():
if isinstance(v, uuid.UUID):
doc[k] = str(v)
await self.collection.insert_one(doc)
logger.info(f"User created successfully: {user_id}")
return user_model
except HTTPException:
raise
except Exception as e:
logger.error(f"Error creating user: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to create user"
)
async def authenticate_user(self, email_or_phone: str, password: str, ip_address: Optional[str] = None, user_agent: Optional[str] = None) -> Tuple[Optional[SystemUserModel], str]:
"""Authenticate user with email/phone/username and password."""
try:
# Get user by email, phone, or username
user = await self.get_user_by_email(email_or_phone)
if not user:
user = await self.get_user_by_phone(email_or_phone)
if not user:
user = await self.get_user_by_username(email_or_phone)
# Record failed attempt if user not found
if not user:
logger.warning(f"Login attempt with non-existent email/phone/username: {email_or_phone}")
return None, "Invalid email, phone number, or username"
# Check if account is locked
if (user.security_settings.account_locked_until and
user.security_settings.account_locked_until > datetime.utcnow()):
return None, f"Account is locked until {user.security_settings.account_locked_until}"
# Check account status
if user.status not in [UserStatus.ACTIVE]:
return None, f"Account is {user.status.value}"
# Verify password
if not self.verify_password(password, user.password_hash):
await self._record_failed_login(user, ip_address, user_agent)
return None, "Invalid username or password"
# Password correct - reset failed attempts and record successful login
await self._record_successful_login(user, ip_address, user_agent)
return user, "Authentication successful"
except Exception as e:
logger.error(f"Error during authentication: {e}")
return None, "Authentication failed"
async def _record_failed_login(self, user: SystemUserModel, ip_address: Optional[str], user_agent: Optional[str]):
"""Record failed login attempt and update security settings."""
try:
failed_attempts = user.security_settings.failed_login_attempts + 1
now = datetime.utcnow()
# Add login attempt to history
login_attempt = LoginAttemptModel(
timestamp=now,
ip_address=ip_address,
user_agent=user_agent,
success=False,
failure_reason="Invalid password"
)
# Keep only last 10 attempts
attempts_history = user.security_settings.login_attempts[-9:] + [login_attempt]
update_data = {
"security_settings.failed_login_attempts": failed_attempts,
"security_settings.last_failed_login": now,
"security_settings.login_attempts": [attempt.model_dump() for attempt in attempts_history],
"updated_at": now
}
# Lock account if too many failed attempts
if failed_attempts >= settings.MAX_FAILED_LOGIN_ATTEMPTS:
lock_until = now + timedelta(minutes=settings.ACCOUNT_LOCK_DURATION_MINUTES)
update_data["security_settings.account_locked_until"] = lock_until
logger.warning(f"Account locked due to failed login attempts: {user.user_id}")
await self.collection.update_one(
{"user_id": user.user_id},
{"$set": update_data}
)
except Exception as e:
logger.error(f"Error recording failed login: {e}")
async def _record_successful_login(self, user: SystemUserModel, ip_address: Optional[str], user_agent: Optional[str]):
"""Record successful login and reset security counters."""
try:
now = datetime.utcnow()
# Add login attempt to history
login_attempt = LoginAttemptModel(
timestamp=now,
ip_address=ip_address,
user_agent=user_agent,
success=True
)
# Keep only last 10 attempts
attempts_history = user.security_settings.login_attempts[-9:] + [login_attempt]
await self.collection.update_one(
{"user_id": user.user_id},
{"$set": {
"last_login_at": now,
"last_login_ip": ip_address,
"security_settings.failed_login_attempts": 0,
"security_settings.last_failed_login": None,
"security_settings.account_locked_until": None,
"security_settings.login_attempts": [attempt.model_dump() for attempt in attempts_history],
"updated_at": now
}}
)
except Exception as e:
logger.error(f"Error recording successful login: {e}")
async def record_logout(self, user: SystemUserModel, ip_address: Optional[str] = None, user_agent: Optional[str] = None):
"""
Record logout action for audit purposes.
This logs the logout event to both the application logs and optionally to an audit collection.
Args:
user: The user who is logging out
ip_address: IP address from which logout occurred
user_agent: User agent string of the client
"""
try:
now = datetime.utcnow()
# Calculate session duration if last login is available
session_duration = None
if user.last_login_at:
session_duration = (now - user.last_login_at).total_seconds()
# Create audit log entry
audit_log = {
"event_type": "logout",
"user_id": user.user_id,
"username": user.username,
"email": user.email,
"merchant_id": user.merchant_id,
"merchant_type": user.merchant_type,
"role": user.role,
"timestamp": now,
"ip_address": ip_address,
"user_agent": user_agent,
"session_duration_seconds": session_duration,
"last_login_at": user.last_login_at
}
# Log to application logs with structured data
logger.info(
f"User logout: {user.username}",
extra={
"event": "user_logout",
"user_id": user.user_id,
"username": user.username,
"email": user.email,
"merchant_id": user.merchant_id,
"merchant_type": user.merchant_type,
"role": user.role,
"ip_address": ip_address,
"user_agent": user_agent,
"session_duration": session_duration,
"last_login_at": user.last_login_at.isoformat() if user.last_login_at else None
}
)
# Store in audit logs collection for compliance and analysis
try:
auth_logs_collection = self.db["scm_auth_logs"]
await auth_logs_collection.insert_one(audit_log)
logger.debug(f"Logout audit record stored for user {user.user_id}")
except Exception as audit_error:
# Don't fail logout if audit logging fails
logger.error(
f"Failed to store logout audit log: {audit_error}",
extra={"user_id": user.user_id}
)
except Exception as e:
logger.error(
f"Error recording logout audit: {e}",
extra={"user_id": user.user_id if user else None},
exc_info=True
)
def is_password_expired(self, user: SystemUserModel) -> bool:
"""
Check if user's password has expired based on rotation policy.
Args:
user: The system user model
Returns:
bool: True if password is expired, False otherwise
"""
if not user.security_settings.last_password_change:
# If no password change record, consider it expired (initial setup)
return True
now = datetime.utcnow()
days_since_change = (now - user.security_settings.last_password_change).days
return days_since_change >= settings.PASSWORD_ROTATION_DAYS
def get_password_age_days(self, user: SystemUserModel) -> int:
"""
Get the age of user's password in days.
Args:
user: The system user model
Returns:
int: Number of days since last password change
"""
if not user.security_settings.last_password_change:
return -1 # Never changed
now = datetime.utcnow()
return (now - user.security_settings.last_password_change).days
def get_password_rotation_status(self, user: SystemUserModel) -> Dict[str, Any]:
"""
Get comprehensive password rotation status for a user.
Args:
user: The system user model
Returns:
Dict: Password rotation status information
"""
password_age = self.get_password_age_days(user)
is_expired = self.is_password_expired(user)
days_until_expiry = settings.PASSWORD_ROTATION_DAYS - password_age
warning_threshold = settings.PASSWORD_ROTATION_WARNING_DAYS
status = "expired" if is_expired else "active"
if password_age >= 0 and days_until_expiry <= warning_threshold:
status = "warning"
return {
"password_status": status,
"password_age_days": password_age,
"password_rotation_days_required": settings.PASSWORD_ROTATION_DAYS,
"days_until_expiry": max(0, days_until_expiry),
"last_password_change": user.security_settings.last_password_change,
"requires_password_change": is_expired and settings.ENFORCE_PASSWORD_ROTATION,
"warning_threshold_days": warning_threshold
}
async def mark_password_change_required(self, user_id: UUID, reason: str = "password_rotation") -> bool:
"""
Mark that a user must change their password.
Args:
user_id: User ID
reason: Reason for requiring password change
Returns:
bool: Success status
"""
try:
now = datetime.utcnow()
audit_log = {
"event_type": "password_change_required",
"user_id": user_id,
"reason": reason,
"timestamp": now,
"enforced_at": now
}
# Store audit log
try:
auth_logs_collection = self.db["scm_auth_logs"]
await auth_logs_collection.insert_one(audit_log)
except Exception as audit_error:
logger.warning(f"Failed to store password change audit log: {audit_error}")
# Update user record
await self.collection.update_one(
{"user_id": user_id},
{"$set": {
"security_settings.require_password_change": True,
"updated_at": now
}}
)
logger.info(
f"Password change marked as required for user {user_id}",
extra={"reason": reason, "user_id": user_id}
)
return True
except Exception as e:
logger.error(f"Error marking password change required for {user_id}: {e}")
return False
async def record_password_change(self, user_id: UUID, changed_by: str = "user") -> bool:
"""
Record password change for rotation policy compliance.
Args:
user_id: User ID
changed_by: Who initiated the change (user, admin, system)
Returns:
bool: Success status
"""
try:
now = datetime.utcnow()
audit_log = {
"event_type": "password_changed",
"user_id": user_id,
"changed_by": changed_by,
"timestamp": now,
"previous_rotation_status": None
}
# Get user's previous status
user = await self.get_user_by_id(user_id)
if user:
audit_log["previous_rotation_status"] = self.get_password_rotation_status(user)
# Store audit log
try:
auth_logs_collection = self.db["scm_auth_logs"]
await auth_logs_collection.insert_one(audit_log)
except Exception as audit_error:
logger.warning(f"Failed to store password change audit log: {audit_error}")
logger.info(
f"Password change recorded for user {user_id}",
extra={
"event": "password_changed",
"user_id": user_id,
"changed_by": changed_by
}
)
return True
except Exception as e:
logger.error(f"Error recording password change for {user_id}: {e}")
return False
"""Update user information."""
try:
user = await self.get_user_by_id(user_id)
if not user:
return None
update_dict = {}
for field, value in update_data.dict(exclude_unset=True).items():
if value is not None:
update_dict[field] = value
if update_dict:
update_dict["updated_at"] = datetime.utcnow()
update_dict["updated_by"] = updated_by
await self.collection.update_one(
{"user_id": user_id},
{"$set": update_dict}
)
return await self.get_user_by_id(user_id)
except Exception as e:
logger.error(f"Error updating user {user_id}: {e}")
return None
async def change_password(self, user_id: UUID, current_password: str, new_password: str) -> bool:
"""Change user password."""
try:
user = await self.get_user_by_id(user_id)
if not user:
return False
# Verify current password
if not self.verify_password(current_password, user.password_hash):
return False
# Hash new password
new_password_hash = self.get_password_hash(new_password)
now = datetime.utcnow()
# Update password
await self.collection.update_one(
{"user_id": str(user_id)},
{"$set": {
"password_hash": new_password_hash,
"security_settings.last_password_change": datetime.utcnow(),
"security_settings.require_password_change": False,
"last_login_at": now,
"updated_at": datetime.utcnow()
}}
)
# Record password change for rotation policy compliance
await self.record_password_change(user_id, changed_by="user")
logger.info(f"Password changed for user: {user_id}")
return True
except Exception as e:
logger.error(f"Error changing password for user {user_id}: {e}")
return False
async def list_users(self, page: int = 1, page_size: int = 20, status_filter: Optional[UserStatus] = None) -> Tuple[List[SystemUserModel], int]:
"""List users with pagination."""
try:
skip = (page - 1) * page_size
# Build query filter
query_filter = {}
if status_filter:
query_filter["status"] = status_filter.value
# Get total count
total_count = await self.collection.count_documents(query_filter)
# Get users - don't exclude password_hash to avoid validation errors
cursor = self.collection.find(query_filter).skip(skip).limit(page_size).sort("created_at", -1)
users = []
async for user_doc in cursor:
users.append(SystemUserModel(**user_doc))
return users, total_count
except Exception as e:
logger.error(f"Error listing users: {e}")
return [], 0
async def list_users_with_projection(
self,
filters: Optional[Dict[str, Any]] = None,
skip: int = 0,
limit: int = 100,
projection_list: Optional[List[str]] = None,
status_filter: Optional[UserStatus] = None,
role_filter: Optional[str] = None,
merchant_id_filter: Optional[str] = None,
merchant_type_filter: Optional[str] = None
):
"""
List users with projection support following API standards.
Args:
filters: Additional filter criteria
skip: Number of records to skip
limit: Maximum number of records to return
projection_list: List of fields to include in response
status_filter: Filter by user status
role_filter: Filter by user role
merchant_id_filter: Filter by merchant ID
merchant_type_filter: Filter by merchant type
Returns:
List of users (raw dict if projection, SystemUserModel otherwise)
"""
try:
# Build query filter
query_filter = {}
# Add specific filters
if status_filter:
query_filter["status"] = status_filter.value
if role_filter:
query_filter["role"] = role_filter
if merchant_id_filter:
query_filter["merchant_id"] = merchant_id_filter
if merchant_type_filter:
query_filter["merchant_type"] = merchant_type_filter
# Add additional filters
if filters:
query_filter.update(filters)
# Build MongoDB projection
projection_dict = None
if projection_list:
projection_dict = {field: 1 for field in projection_list}
projection_dict["_id"] = 0
# Query with projection
cursor = self.collection.find(query_filter, projection_dict).skip(skip).limit(limit).sort("created_at", -1)
docs = await cursor.to_list(length=limit)
# Return raw dict if projection, model otherwise
if projection_list:
return docs
else:
return [SystemUserModel(**doc) for doc in docs]
except Exception as e:
logger.error(f"Error listing users with projection: {e}")
return []
async def deactivate_user(self, user_id: str, deactivated_by: str) -> bool:
"""Deactivate user account."""
try:
result = await self.collection.update_one(
{"user_id": user_id},
{"$set": {
"status": UserStatus.INACTIVE.value,
"updated_at": datetime.utcnow(),
"updated_by": deactivated_by
}}
)
if result.modified_count > 0:
logger.info(f"User deactivated: {user_id}")
return True
return False
except Exception as e:
logger.error(f"Error deactivating user {user_id}: {e}")
return False
async def create_password_reset_token(self, email: str) -> Optional[str]:
"""
Create a password reset token for a user.
Args:
email: User's email address
Returns:
Reset token string or None if user not found
"""
try:
# Get user by email
user = await self.get_user_by_email(email)
if not user:
logger.warning(f"Password reset requested for non-existent email: {email}")
# Don't reveal that user doesn't exist for security
return None
# Check if account is active
if user.status not in [UserStatus.ACTIVE, UserStatus.PENDING_ACTIVATION]:
logger.warning(f"Password reset requested for {user.status.value} account: {email}")
return None
# Generate secure token
reset_token = secrets.token_urlsafe(32)
# Create JWT token with expiration
token_data = {
"sub": user.user_id,
"email": user.email,
"type": "password_reset",
"token": reset_token,
"exp": datetime.utcnow() + timedelta(minutes=settings.PASSWORD_RESET_TOKEN_EXPIRATION_MINUTES)
}
jwt_token = jwt.encode(token_data, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
# Store reset token info in database
await self.collection.update_one(
{"user_id": user.user_id},
{"$set": {
"security_settings.password_reset_token": reset_token,
"security_settings.password_reset_token_created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}}
)
logger.info(f"Password reset token created for user: {user.user_id}")
return jwt_token
except Exception as e:
logger.error(f"Error creating password reset token: {e}")
return None
async def send_password_reset_email(self, email: str) -> bool:
"""
Generate password reset token and send reset email.
Args:
email: User's email address
Returns:
True if email was sent successfully, False otherwise
"""
try:
# Get user
user = await self.get_user_by_email(email)
if not user:
# Don't reveal that user doesn't exist
logger.warning(f"Password reset email requested for non-existent email: {email}")
return True # Return true to not leak information
# Create reset token
reset_token = await self.create_password_reset_token(email)
if not reset_token:
logger.error(f"Failed to create reset token for: {email}")
return False
# Build reset link
reset_link = f"{settings.PASSWORD_RESET_BASE_URL}?token={reset_token}"
# Send email
email_sent = await email_service.send_password_reset_email(
to_email=user.email,
reset_link=reset_link,
user_name=user.first_name
)
if email_sent:
logger.info(f"Password reset email sent to: {email}")
else:
logger.error(f"Failed to send password reset email to: {email}")
return email_sent
except Exception as e:
logger.error(f"Error sending password reset email: {e}")
return False
async def verify_password_reset_token(self, token: str) -> Optional[Dict[str, Any]]:
"""
Verify password reset token and return user info.
Args:
token: JWT reset token
Returns:
Dict with user_id and email if valid, None otherwise
"""
try:
# Decode JWT token
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
# Check token type
if payload.get("type") != "password_reset":
logger.warning("Invalid token type for password reset")
return None
user_id = payload.get("sub")
email = payload.get("email")
reset_token = payload.get("token")
if not all([user_id, email, reset_token]):
logger.warning("Missing required fields in reset token")
return None
# Get user from database
user = await self.get_user_by_id(user_id)
if not user:
logger.warning(f"User not found for reset token: {user_id}")
return None
# Verify email matches
if user.email != email:
logger.warning(f"Email mismatch in reset token for user: {user_id}")
return None
# Verify token matches stored token
stored_token = user.security_settings.password_reset_token
if not stored_token or stored_token != reset_token:
logger.warning(f"Reset token mismatch for user: {user_id}")
return None
# Check if token was used (token should be cleared after use)
if not user.security_settings.password_reset_token:
logger.warning(f"Reset token already used for user: {user_id}")
return None
return {
"user_id": user_id,
"email": email,
"token": reset_token
}
except JWTError as e:
logger.warning(f"Invalid or expired reset token: {e}")
return None
except Exception as e:
logger.error(f"Error verifying reset token: {e}")
return None
async def reset_password_with_token(self, token: str, new_password: str) -> Tuple[bool, str]:
"""
Reset user password using reset token.
Args:
token: JWT reset token
new_password: New password to set
Returns:
Tuple of (success: bool, message: str)
"""
try:
# Verify token
token_data = await self.verify_password_reset_token(token)
if not token_data:
return False, "Invalid or expired reset token"
user_id = token_data["user_id"]
# Hash new password
new_password_hash = self.get_password_hash(new_password)
# Update password and clear reset token
result = await self.collection.update_one(
{"user_id": user_id},
{"$set": {
"password_hash": new_password_hash,
"security_settings.password_reset_token": None,
"security_settings.password_reset_token_created_at": None,
"security_settings.last_password_change": datetime.utcnow(),
"security_settings.require_password_change": False,
"security_settings.failed_login_attempts": 0,
"security_settings.account_locked_until": None,
"updated_at": datetime.utcnow()
}}
)
if result.modified_count > 0:
logger.info(f"Password reset successfully for user: {user_id}")
return True, "Password reset successfully"
else:
logger.error(f"Failed to update password for user: {user_id}")
return False, "Failed to reset password"
except Exception as e:
logger.error(f"Error resetting password: {e}")
return False, "An error occurred while resetting password"
def convert_to_user_info_response(self, user: SystemUserModel) -> UserInfoResponse:
"""Convert SystemUserModel to UserInfoResponse."""
return UserInfoResponse(
user_id=user.user_id,
username=user.username,
email=user.email,
merchant_id=user.merchant_id,
merchant_type=user.merchant_type,
full_name=user.full_name,
role=user.role,
permissions=user.permissions,
status=user.status,
last_login_at=user.last_login_at,
profile_picture_url=user.profile_picture_url
)
async def get_all_roles(self) -> List[dict]:
"""Get all access roles from database."""
try:
cursor = self.db[AUTH_ACCESS_ROLES_COLLECTION].find({})
roles = await cursor.to_list(length=None)
return roles
except Exception as e:
logger.error(f"Error fetching access roles: {e}")
return []
async def get_scm_permissions_by_role(self, role: str) -> Optional[Dict[str, Any]]:
"""
Fetch permissions from SCM access roles collection based on user role.
Maps user role values to SCM role_ids.
Args:
role: User role value (e.g., "super_admin", "admin")
Returns:
Permissions dict or None if not found
"""
try:
# Map user role values to SCM role_ids
role_to_scm_id = {
"super_admin": "role_super_admin",
"admin": "role_company_admin",
"manager": "role_cnf_manager",
"user": "role_retail_owner",
"read_only": "role_retail_staff"
}
# Get the SCM role_id
scm_role_id = role_to_scm_id.get(role)
if not scm_role_id:
logger.warning(f"No SCM mapping for role: {role}, trying direct lookup")
scm_role_id = role
# Fetch the role from SCM access roles collection
scm_role = await self.db[SCM_ACCESS_ROLES_COLLECTION].find_one(
{"role_id": scm_role_id, "is_active": True}
)
if scm_role and "permissions" in scm_role:
logger.info(f"✅ Fetched SCM permissions for {role} -> {scm_role_id}: {scm_role.get('role_name')}")
return scm_role["permissions"]
else:
logger.warning(f"❌ No permissions found in scm_access_roles for role_id: {scm_role_id}")
return None
except Exception as e:
logger.error(f"Error fetching SCM permissions for role {role}: {e}")
return None