Spaces:
Sleeping
Sleeping
| from datetime import datetime | |
| from typing import Optional, List, Dict, Any | |
| from bson import ObjectId | |
| import logging | |
| from app.core.nosql_client import db | |
| logger = logging.getLogger(__name__) | |
| class WalletModel: | |
| """Model for managing user wallet operations""" | |
| wallet_collection = db["user_wallets"] | |
| transaction_collection = db["wallet_transactions"] | |
| async def get_wallet_balance(customer_id: str) -> float: | |
| """Get current wallet balance for a user""" | |
| try: | |
| wallet = await WalletModel.wallet_collection.find_one({"customer_id": customer_id}) | |
| if wallet: | |
| return wallet.get("balance", 0.0) | |
| else: | |
| # Create wallet if doesn't exist | |
| await WalletModel.create_wallet(customer_id) | |
| return 0.0 | |
| except Exception as e: | |
| logger.error(f"Error getting wallet balance for user {customer_id}: {str(e)}") | |
| return 0.0 | |
| async def create_wallet(customer_id: str, initial_balance: float = 0.0) -> bool: | |
| """Create a new wallet for a user""" | |
| try: | |
| wallet_doc = { | |
| "customer_id": customer_id, | |
| "balance": initial_balance, | |
| "created_at": datetime.utcnow(), | |
| "updated_at": datetime.utcnow() | |
| } | |
| result = await WalletModel.wallet_collection.insert_one(wallet_doc) | |
| logger.info(f"Created wallet for user {customer_id} with balance {initial_balance}") | |
| return result.inserted_id is not None | |
| except Exception as e: | |
| logger.error(f"Error creating wallet for user {customer_id}: {str(e)}") | |
| return False | |
| async def update_balance(customer_id: str, amount: float, transaction_type: str, | |
| description: str = "", reference_id: str = None) -> bool: | |
| """Update wallet balance and create transaction record""" | |
| try: | |
| # Get current balance | |
| current_balance = await WalletModel.get_wallet_balance(customer_id) | |
| # Calculate new balance | |
| if transaction_type in ["credit", "refund", "cashback"]: | |
| new_balance = current_balance + amount | |
| elif transaction_type in ["debit", "payment", "withdrawal"]: | |
| if current_balance < amount: | |
| logger.warning(f"Insufficient balance for user {customer_id}. Current: {current_balance}, Required: {amount}") | |
| return False | |
| new_balance = current_balance - amount | |
| else: | |
| logger.error(f"Invalid transaction type: {transaction_type}") | |
| return False | |
| # Update wallet balance | |
| update_result = await WalletModel.wallet_collection.update_one( | |
| {"customer_id": customer_id}, | |
| { | |
| "$set": { | |
| "balance": new_balance, | |
| "updated_at": datetime.utcnow() | |
| } | |
| }, | |
| upsert=True | |
| ) | |
| # Create transaction record | |
| transaction_doc = { | |
| "customer_id": customer_id, | |
| "amount": amount, | |
| "transaction_type": transaction_type, | |
| "description": description, | |
| "reference_id": reference_id, | |
| "balance_before": current_balance, | |
| "balance_after": new_balance, | |
| "timestamp": datetime.utcnow(), | |
| "status": "completed" | |
| } | |
| await WalletModel.transaction_collection.insert_one(transaction_doc) | |
| logger.info(f"Updated wallet for user {customer_id}: {transaction_type} of {amount}, new balance: {new_balance}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error updating wallet balance for user {customer_id}: {str(e)}") | |
| return False | |
| async def get_transaction_history(customer_id: str, page: int = 1, per_page: int = 20) -> Dict[str, Any]: | |
| """Get paginated transaction history for a user""" | |
| try: | |
| skip = (page - 1) * per_page | |
| # Get transactions with pagination | |
| cursor = WalletModel.transaction_collection.find( | |
| {"customer_id": customer_id} | |
| ).sort("timestamp", -1).skip(skip).limit(per_page) | |
| transactions = [] | |
| async for transaction in cursor: | |
| # Convert ObjectId to string for JSON serialization | |
| transaction["_id"] = str(transaction["_id"]) | |
| transactions.append(transaction) | |
| # Get total count | |
| total_count = await WalletModel.transaction_collection.count_documents({"customer_id": customer_id}) | |
| return { | |
| "transactions": transactions, | |
| "total_count": total_count, | |
| "page": page, | |
| "per_page": per_page, | |
| "total_pages": (total_count + per_page - 1) // per_page | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting transaction history for user {customer_id}: {str(e)}") | |
| return { | |
| "transactions": [], | |
| "total_count": 0, | |
| "page": page, | |
| "per_page": per_page, | |
| "total_pages": 0 | |
| } | |
| async def get_wallet_summary(customer_id: str) -> Dict[str, Any]: | |
| """Get wallet summary including balance and recent transactions""" | |
| try: | |
| balance = await WalletModel.get_wallet_balance(customer_id) | |
| recent_transactions = await WalletModel.get_transaction_history(customer_id, page=1, per_page=5) | |
| return { | |
| "balance": balance, | |
| "recent_transactions": recent_transactions["transactions"] | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting wallet summary for user {customer_id}: {str(e)}") | |
| return { | |
| "balance": 0.0, | |
| "recent_transactions": [] | |
| } |