Spaces:
Sleeping
Sleeping
File size: 6,393 Bytes
82a6cc5 a9ccd3b 82a6cc5 a9ccd3b 82a6cc5 a9ccd3b 82a6cc5 a9ccd3b 82a6cc5 a9ccd3b 82a6cc5 a9ccd3b 82a6cc5 a9ccd3b 82a6cc5 a9ccd3b 82a6cc5 a9ccd3b 82a6cc5 a9ccd3b 82a6cc5 a9ccd3b 82a6cc5 a9ccd3b 82a6cc5 a9ccd3b 82a6cc5 a9ccd3b 82a6cc5 a9ccd3b 82a6cc5 a9ccd3b 82a6cc5 a9ccd3b 82a6cc5 a9ccd3b 82a6cc5 a9ccd3b 82a6cc5 a9ccd3b 82a6cc5 a9ccd3b 82a6cc5 a9ccd3b 82a6cc5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 | from datetime import datetime
from typing import Optional, List, Dict, Any
from bson import ObjectId
import logging
from app.core.nosql_client import db
logger = logging.getLogger(__name__)
class WalletModel:
"""Model for managing user wallet operations"""
wallet_collection = db["user_wallets"]
transaction_collection = db["wallet_transactions"]
@staticmethod
async def get_wallet_balance(customer_id: str) -> float:
"""Get current wallet balance for a user"""
try:
wallet = await WalletModel.wallet_collection.find_one({"customer_id": customer_id})
if wallet:
return wallet.get("balance", 0.0)
else:
# Create wallet if doesn't exist
await WalletModel.create_wallet(customer_id)
return 0.0
except Exception as e:
logger.error(f"Error getting wallet balance for user {customer_id}: {str(e)}")
return 0.0
@staticmethod
async def create_wallet(customer_id: str, initial_balance: float = 0.0) -> bool:
"""Create a new wallet for a user"""
try:
wallet_doc = {
"customer_id": customer_id,
"balance": initial_balance,
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
result = await WalletModel.wallet_collection.insert_one(wallet_doc)
logger.info(f"Created wallet for user {customer_id} with balance {initial_balance}")
return result.inserted_id is not None
except Exception as e:
logger.error(f"Error creating wallet for user {customer_id}: {str(e)}")
return False
@staticmethod
async def update_balance(customer_id: str, amount: float, transaction_type: str,
description: str = "", reference_id: str = None) -> bool:
"""Update wallet balance and create transaction record"""
try:
# Get current balance
current_balance = await WalletModel.get_wallet_balance(customer_id)
# Calculate new balance
if transaction_type in ["credit", "refund", "cashback"]:
new_balance = current_balance + amount
elif transaction_type in ["debit", "payment", "withdrawal"]:
if current_balance < amount:
logger.warning(f"Insufficient balance for user {customer_id}. Current: {current_balance}, Required: {amount}")
return False
new_balance = current_balance - amount
else:
logger.error(f"Invalid transaction type: {transaction_type}")
return False
# Update wallet balance
update_result = await WalletModel.wallet_collection.update_one(
{"customer_id": customer_id},
{
"$set": {
"balance": new_balance,
"updated_at": datetime.utcnow()
}
},
upsert=True
)
# Create transaction record
transaction_doc = {
"customer_id": customer_id,
"amount": amount,
"transaction_type": transaction_type,
"description": description,
"reference_id": reference_id,
"balance_before": current_balance,
"balance_after": new_balance,
"timestamp": datetime.utcnow(),
"status": "completed"
}
await WalletModel.transaction_collection.insert_one(transaction_doc)
logger.info(f"Updated wallet for user {customer_id}: {transaction_type} of {amount}, new balance: {new_balance}")
return True
except Exception as e:
logger.error(f"Error updating wallet balance for user {customer_id}: {str(e)}")
return False
@staticmethod
async def get_transaction_history(customer_id: str, page: int = 1, per_page: int = 20) -> Dict[str, Any]:
"""Get paginated transaction history for a user"""
try:
skip = (page - 1) * per_page
# Get transactions with pagination
cursor = WalletModel.transaction_collection.find(
{"customer_id": customer_id}
).sort("timestamp", -1).skip(skip).limit(per_page)
transactions = []
async for transaction in cursor:
# Convert ObjectId to string for JSON serialization
transaction["_id"] = str(transaction["_id"])
transactions.append(transaction)
# Get total count
total_count = await WalletModel.transaction_collection.count_documents({"customer_id": customer_id})
return {
"transactions": transactions,
"total_count": total_count,
"page": page,
"per_page": per_page,
"total_pages": (total_count + per_page - 1) // per_page
}
except Exception as e:
logger.error(f"Error getting transaction history for user {customer_id}: {str(e)}")
return {
"transactions": [],
"total_count": 0,
"page": page,
"per_page": per_page,
"total_pages": 0
}
@staticmethod
async def get_wallet_summary(customer_id: str) -> Dict[str, Any]:
"""Get wallet summary including balance and recent transactions"""
try:
balance = await WalletModel.get_wallet_balance(customer_id)
recent_transactions = await WalletModel.get_transaction_history(customer_id, page=1, per_page=5)
return {
"balance": balance,
"recent_transactions": recent_transactions["transactions"]
}
except Exception as e:
logger.error(f"Error getting wallet summary for user {customer_id}: {str(e)}")
return {
"balance": 0.0,
"recent_transactions": []
} |