bookmyservice-ums / app /models /wallet_model.py
MukeshKapoor25's picture
refactor(user): rename user_id to customer_id across all modules
a9ccd3b
from datetime import datetime
from typing import Optional, List, Dict, Any
from bson import ObjectId
import logging
from app.core.nosql_client import db
logger = logging.getLogger(__name__)
class WalletModel:
"""Model for managing user wallet operations"""
wallet_collection = db["user_wallets"]
transaction_collection = db["wallet_transactions"]
@staticmethod
async def get_wallet_balance(customer_id: str) -> float:
"""Get current wallet balance for a user"""
try:
wallet = await WalletModel.wallet_collection.find_one({"customer_id": customer_id})
if wallet:
return wallet.get("balance", 0.0)
else:
# Create wallet if doesn't exist
await WalletModel.create_wallet(customer_id)
return 0.0
except Exception as e:
logger.error(f"Error getting wallet balance for user {customer_id}: {str(e)}")
return 0.0
@staticmethod
async def create_wallet(customer_id: str, initial_balance: float = 0.0) -> bool:
"""Create a new wallet for a user"""
try:
wallet_doc = {
"customer_id": customer_id,
"balance": initial_balance,
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
result = await WalletModel.wallet_collection.insert_one(wallet_doc)
logger.info(f"Created wallet for user {customer_id} with balance {initial_balance}")
return result.inserted_id is not None
except Exception as e:
logger.error(f"Error creating wallet for user {customer_id}: {str(e)}")
return False
@staticmethod
async def update_balance(customer_id: str, amount: float, transaction_type: str,
description: str = "", reference_id: str = None) -> bool:
"""Update wallet balance and create transaction record"""
try:
# Get current balance
current_balance = await WalletModel.get_wallet_balance(customer_id)
# Calculate new balance
if transaction_type in ["credit", "refund", "cashback"]:
new_balance = current_balance + amount
elif transaction_type in ["debit", "payment", "withdrawal"]:
if current_balance < amount:
logger.warning(f"Insufficient balance for user {customer_id}. Current: {current_balance}, Required: {amount}")
return False
new_balance = current_balance - amount
else:
logger.error(f"Invalid transaction type: {transaction_type}")
return False
# Update wallet balance
update_result = await WalletModel.wallet_collection.update_one(
{"customer_id": customer_id},
{
"$set": {
"balance": new_balance,
"updated_at": datetime.utcnow()
}
},
upsert=True
)
# Create transaction record
transaction_doc = {
"customer_id": customer_id,
"amount": amount,
"transaction_type": transaction_type,
"description": description,
"reference_id": reference_id,
"balance_before": current_balance,
"balance_after": new_balance,
"timestamp": datetime.utcnow(),
"status": "completed"
}
await WalletModel.transaction_collection.insert_one(transaction_doc)
logger.info(f"Updated wallet for user {customer_id}: {transaction_type} of {amount}, new balance: {new_balance}")
return True
except Exception as e:
logger.error(f"Error updating wallet balance for user {customer_id}: {str(e)}")
return False
@staticmethod
async def get_transaction_history(customer_id: str, page: int = 1, per_page: int = 20) -> Dict[str, Any]:
"""Get paginated transaction history for a user"""
try:
skip = (page - 1) * per_page
# Get transactions with pagination
cursor = WalletModel.transaction_collection.find(
{"customer_id": customer_id}
).sort("timestamp", -1).skip(skip).limit(per_page)
transactions = []
async for transaction in cursor:
# Convert ObjectId to string for JSON serialization
transaction["_id"] = str(transaction["_id"])
transactions.append(transaction)
# Get total count
total_count = await WalletModel.transaction_collection.count_documents({"customer_id": customer_id})
return {
"transactions": transactions,
"total_count": total_count,
"page": page,
"per_page": per_page,
"total_pages": (total_count + per_page - 1) // per_page
}
except Exception as e:
logger.error(f"Error getting transaction history for user {customer_id}: {str(e)}")
return {
"transactions": [],
"total_count": 0,
"page": page,
"per_page": per_page,
"total_pages": 0
}
@staticmethod
async def get_wallet_summary(customer_id: str) -> Dict[str, Any]:
"""Get wallet summary including balance and recent transactions"""
try:
balance = await WalletModel.get_wallet_balance(customer_id)
recent_transactions = await WalletModel.get_transaction_history(customer_id, page=1, per_page=5)
return {
"balance": balance,
"recent_transactions": recent_transactions["transactions"]
}
except Exception as e:
logger.error(f"Error getting wallet summary for user {customer_id}: {str(e)}")
return {
"balance": 0.0,
"recent_transactions": []
}