Spaces:
Sleeping
Sleeping
| """ | |
| Production-Ready Repository Layer for Real Estate Tokenization Platform | |
| Implements the complete database schema with MongoDB | |
| """ | |
| from datetime import datetime | |
| from typing import Optional, List, Dict, Any | |
| import logging | |
| from pymongo.collection import Collection | |
| from pymongo import ReturnDocument, ASCENDING, DESCENDING | |
| from bson import ObjectId | |
| from db import get_db, get_next_sequence | |
| from config import settings | |
| from utils.logger import setup_logger | |
| logger = setup_logger(__name__) | |
| def _now(): | |
| """Get current UTC timestamp""" | |
| return datetime.utcnow() | |
| def _clean_object(data): | |
| """Clean MongoDB object by converting ObjectId to string""" | |
| if isinstance(data, dict): | |
| cleaned = {} | |
| for key, value in data.items(): | |
| if key == "_id": | |
| cleaned["id"] = str(value) if isinstance(value, ObjectId) else value | |
| else: | |
| cleaned[key] = _clean_object(value) | |
| return cleaned | |
| if isinstance(data, list): | |
| return [_clean_object(item) for item in data] | |
| if isinstance(data, ObjectId): | |
| return str(data) | |
| return data | |
| def _to_object_id(id_str: str) -> ObjectId: | |
| """Convert string ID to ObjectId""" | |
| try: | |
| return ObjectId(id_str) | |
| except: | |
| raise ValueError(f"Invalid ObjectId: {id_str}") | |
| def _coerce_object_id(id_value): | |
| """Return ObjectId when possible, otherwise fall back to original value.""" | |
| if id_value is None or isinstance(id_value, ObjectId): | |
| return id_value | |
| try: | |
| return ObjectId(id_value) | |
| except Exception: | |
| logger.warning("Invalid ObjectId %r; storing plain value", id_value) | |
| return id_value | |
| # ============================================================================ | |
| # USER OPERATIONS | |
| # ============================================================================ | |
| def users_col(db) -> Collection: | |
| return db.users | |
| def create_user(db, name: str, email: str, password_hash: str, country_code: str, phone: str, role: str = "user") -> Dict[str, Any]: | |
| """Create a new user""" | |
| logger.info(f"Creating user: {email} with role: {role}") | |
| now = _now() | |
| doc = { | |
| "name": name, | |
| "email": email, | |
| "password_hash": password_hash, | |
| "role": role, | |
| "country_code": country_code, | |
| "phone": phone, | |
| "wallet_id": None, | |
| "is_active": True, | |
| "deleted": False, | |
| "created_at": now, | |
| "updated_at": now, | |
| } | |
| result = users_col(db).insert_one(doc) | |
| doc["_id"] = result.inserted_id | |
| logger.info(f"User created successfully with ID: {result.inserted_id}") | |
| return _clean_object(doc) | |
| def get_user_by_email(db, email: str) -> Optional[Dict[str, Any]]: | |
| """Get user by email""" | |
| doc = users_col(db).find_one({"email": email, "deleted": False}) | |
| return _clean_object(doc) if doc else None | |
| def get_user_by_id(db, user_id: str) -> Optional[Dict[str, Any]]: | |
| """Get user by ID""" | |
| doc = users_col(db).find_one({"_id": _to_object_id(user_id), "deleted": False}) | |
| return _clean_object(doc) if doc else None | |
| def get_user_by_role(db, role: str) -> Optional[Dict[str, Any]]: | |
| """Get a user by role (e.g., 'admin', 'super_admin')""" | |
| doc = users_col(db).find_one({"role": role, "deleted": False, "is_active": True}) | |
| return _clean_object(doc) if doc else None | |
| def update_user(db, user_id: str, fields: Dict[str, Any]) -> Optional[Dict[str, Any]]: | |
| """Update user fields""" | |
| print(f"[REPO] Updating user {user_id} with fields: {list(fields.keys())}") | |
| fields["updated_at"] = _now() | |
| doc = users_col(db).find_one_and_update( | |
| {"_id": _to_object_id(user_id)}, | |
| {"$set": fields}, | |
| return_document=ReturnDocument.AFTER | |
| ) | |
| if doc: | |
| print(f"[REPO] User updated successfully") | |
| return _clean_object(doc) if doc else None | |
| def list_all_users(db, skip: int = 0, limit: int = 100, fetch_live_xrp: bool = True) -> List[Dict[str, Any]]: | |
| """List all users with pagination and wallet information | |
| Excludes admin users - only returns regular users with role='user' | |
| Args: | |
| db: Database connection | |
| skip: Number of records to skip | |
| limit: Maximum number of records to return | |
| fetch_live_xrp: If True, fetch live XRP balance from blockchain (slower but accurate) | |
| """ | |
| # Filter to only show regular users, exclude admins | |
| # user_docs = list(users_col(db).find({"deleted": False}).skip(skip).limit(limit).sort("created_at", DESCENDING)) | |
| user_docs = list(users_col(db).find({"deleted": False, "role": "user"}).skip(skip).limit(limit).sort("created_at", DESCENDING)) | |
| users_with_wallets = [] | |
| for user_doc in user_docs: | |
| user = _clean_object(user_doc) | |
| # Get wallet information for balances | |
| wallet_doc = wallets_col(db).find_one({"user_id": _to_object_id(user["id"])}) | |
| if wallet_doc: | |
| # Always use wallet.balance as the source of truth for AED balance | |
| user["aed_balance"] = wallet_doc.get("balance", 0.0) | |
| user["xrp_address"] = wallet_doc.get("xrp_address") | |
| # Use cached XRP balance instead of live fetching (much faster!) | |
| if wallet_doc.get("xrp_address"): | |
| # Try to get cached balance from Redis first | |
| cached_balance = None | |
| try: | |
| from utils.cache import get_cached_xrp_balance | |
| cached_balance = get_cached_xrp_balance(wallet_doc.get("xrp_address")) | |
| except ImportError: | |
| pass | |
| if cached_balance is not None: | |
| # Use cached balance (fast) | |
| user["xrp_balance"] = cached_balance | |
| elif wallet_doc.get("xrp_balance_cached"): | |
| # Use database cached balance (fallback) | |
| user["xrp_balance"] = wallet_doc.get("xrp_balance_cached", 0.0) | |
| elif fetch_live_xrp: | |
| # Only fetch live if explicitly requested and no cache available | |
| try: | |
| from services.xrp_service import XRPLService | |
| from utils.cache import cache_xrp_balance | |
| xrpl_service = XRPLService() | |
| live_xrp_balance = xrpl_service.get_xrp_balance(wallet_doc.get("xrp_address")) | |
| user["xrp_balance"] = live_xrp_balance | |
| # Cache the result for next time | |
| cache_xrp_balance(wallet_doc.get("xrp_address"), live_xrp_balance, ttl=300) | |
| # Also update database cache | |
| wallets_col(db).update_one( | |
| {"_id": wallet_doc["_id"]}, | |
| {"$set": {"xrp_balance_cached": live_xrp_balance, "balance_updated_at": _now()}} | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Failed to fetch live XRP balance for {wallet_doc.get('xrp_address')}: {e}") | |
| user["xrp_balance"] = 0.0 | |
| else: | |
| user["xrp_balance"] = 0.0 | |
| else: | |
| user["xrp_balance"] = 0.0 | |
| else: | |
| # No wallet exists yet | |
| user["aed_balance"] = 0.0 | |
| user["xrp_address"] = None | |
| user["xrp_balance"] = 0.0 | |
| users_with_wallets.append(user) | |
| return users_with_wallets | |
| # ============================================================================ | |
| # ADMIN USER HELPERS | |
| # ============================================================================ | |
| def get_primary_admin(db, session=None) -> Optional[Dict[str, Any]]: | |
| """Return first user with role == 'admin'.""" | |
| base_query = {"role": "admin", "deleted": False} | |
| if session: | |
| doc = users_col(db).find_one(base_query, session=session) | |
| else: | |
| doc = users_col(db).find_one(base_query) | |
| return _clean_object(doc) if doc else None | |
| # ============================================================================ | |
| # WALLET OPERATIONS | |
| # ============================================================================ | |
| def wallets_col(db) -> Collection: | |
| return db.wallets | |
| def create_wallet(db, user_id: str, balance: float = 0.0, currency: str = "AED", xrp_address: str = None, xrp_seed: str = None) -> Dict[str, Any]: | |
| """Create a wallet for a user""" | |
| print(f"[REPO] Creating wallet for user {user_id}, balance: {balance} {currency}") | |
| now = _now() | |
| doc = { | |
| "user_id": _to_object_id(user_id), | |
| "balance": balance, | |
| "currency": currency, | |
| "xrp_address": xrp_address, | |
| "xrp_seed": xrp_seed, # Encrypted in production! | |
| "is_active": True, | |
| "created_at": now, | |
| "updated_at": now, | |
| } | |
| result = wallets_col(db).insert_one(doc) | |
| doc["_id"] = result.inserted_id | |
| # Update user with wallet reference | |
| users_col(db).update_one( | |
| {"_id": _to_object_id(user_id)}, | |
| {"$set": {"wallet_id": result.inserted_id, "updated_at": now}} | |
| ) | |
| print(f"[REPO] Wallet created successfully with ID: {result.inserted_id}") | |
| return _clean_object(doc) | |
| def get_wallet_by_user(db, user_id: str) -> Optional[Dict[str, Any]]: | |
| """Get wallet by user ID""" | |
| doc = wallets_col(db).find_one({"user_id": _to_object_id(user_id)}) | |
| return _clean_object(doc) if doc else None | |
| def update_wallet_balance(db, wallet_id: str, amount: float, operation: str = "add", session=None) -> Optional[Dict[str, Any]]: | |
| """Update wallet balance (add or subtract)""" | |
| print(f"[REPO] {operation.capitalize()}ing {amount} to wallet {wallet_id}") | |
| increment = amount if operation == "add" else -amount | |
| doc = wallets_col(db).find_one_and_update( | |
| {"_id": _to_object_id(wallet_id)}, | |
| { | |
| "$inc": {"balance": increment}, | |
| "$set": {"updated_at": _now()} | |
| }, | |
| return_document=ReturnDocument.AFTER, | |
| session=session | |
| ) | |
| if doc: | |
| print(f"[REPO] Wallet balance updated. New balance: {doc.get('balance')}") | |
| return _clean_object(doc) if doc else None | |
| def update_wallet_xrp(db, wallet_id: str, xrp_address: str, xrp_seed: str) -> Optional[Dict[str, Any]]: | |
| """Update wallet with XRP address and seed""" | |
| print(f"[REPO] Adding XRP wallet to wallet {wallet_id}: {xrp_address}") | |
| doc = wallets_col(db).find_one_and_update( | |
| {"_id": _to_object_id(wallet_id)}, | |
| { | |
| "$set": { | |
| "xrp_address": xrp_address, | |
| "xrp_seed": xrp_seed, # Should be encrypted in production | |
| "updated_at": _now() | |
| } | |
| }, | |
| return_document=ReturnDocument.AFTER | |
| ) | |
| if doc: | |
| print(f"[REPO] XRP wallet added successfully") | |
| return _clean_object(doc) if doc else None | |
| # ============================================================================ | |
| # PROPERTY OPERATIONS | |
| # ============================================================================ | |
| def properties_col(db) -> Collection: | |
| return db.properties | |
| def create_property(db, property_data: Dict[str, Any], created_by: str) -> Dict[str, Any]: | |
| """Create a new property""" | |
| print(f"[REPO] Creating property: {property_data.get('title')}") | |
| now = _now() | |
| doc = { | |
| **property_data, | |
| "available_tokens": property_data.get("total_tokens", 0), | |
| "created_by": _coerce_object_id(created_by), | |
| "is_active": True, | |
| "deleted": False, | |
| "created_at": now, | |
| "updated_at": now, | |
| } | |
| result = properties_col(db).insert_one(doc) | |
| doc["_id"] = result.inserted_id | |
| print(f"[REPO] Property created successfully with ID: {result.inserted_id}") | |
| return _clean_object(doc) | |
| def get_property_by_id(db, property_id: str) -> Optional[Dict[str, Any]]: | |
| """Get property by ID""" | |
| doc = properties_col(db).find_one({"_id": _to_object_id(property_id), "deleted": False}) | |
| return _clean_object(doc) if doc else None | |
| def get_property_by_id_optimized(db, property_id: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Get property by ID with all related data using aggregation (optimized) | |
| Returns property with specifications, amenities, images, and documents in a single query | |
| """ | |
| pipeline = [ | |
| {"$match": {"_id": _to_object_id(property_id), "deleted": False}}, | |
| # Join creator information | |
| {"$lookup": { | |
| "from": "users", | |
| "localField": "created_by", | |
| "foreignField": "_id", | |
| "as": "creator" | |
| }}, | |
| {"$unwind": { | |
| "path": "$creator", | |
| "preserveNullAndEmptyArrays": True | |
| }}, | |
| # Join with property_specifications | |
| {"$lookup": { | |
| "from": "property_specifications", | |
| "localField": "_id", | |
| "foreignField": "property_id", | |
| "as": "specifications" | |
| }}, | |
| {"$unwind": { | |
| "path": "$specifications", | |
| "preserveNullAndEmptyArrays": True | |
| }}, | |
| # Join with amenities | |
| {"$lookup": { | |
| "from": "amenities", | |
| "localField": "_id", | |
| "foreignField": "property_id", | |
| "as": "amenities", | |
| "pipeline": [ | |
| {"$match": {"is_active": True}} | |
| ] | |
| }}, | |
| # Join with property_images | |
| {"$lookup": { | |
| "from": "property_images", | |
| "localField": "_id", | |
| "foreignField": "property_id", | |
| "as": "images", | |
| "pipeline": [ | |
| {"$match": {"is_active": True}} | |
| ] | |
| }}, | |
| # Join with documents | |
| {"$lookup": { | |
| "from": "documents", | |
| "localField": "_id", | |
| "foreignField": "property_id", | |
| "as": "documents" | |
| }} | |
| ] | |
| docs = list(properties_col(db).aggregate(pipeline)) | |
| if docs: | |
| return _clean_object(docs[0]) | |
| return None | |
| def list_properties(db, is_active: Optional[bool] = None, skip: int = 0, limit: int = 100) -> List[Dict[str, Any]]: | |
| """List properties with optional filtering""" | |
| query = {"deleted": False} | |
| if is_active is not None: | |
| query["is_active"] = is_active | |
| docs = list(properties_col(db).find(query).skip(skip).limit(limit).sort("created_at", DESCENDING)) | |
| return [_clean_object(doc) for doc in docs] | |
| def list_properties_optimized(db, is_active: Optional[bool] = None, skip: int = 0, limit: int = 100) -> List[Dict[str, Any]]: | |
| """ | |
| List properties with related data using MongoDB aggregation (optimized - no N+1 queries) | |
| Returns properties with specifications, amenities, images, and documents in a single query | |
| """ | |
| # Build match query | |
| match_query = {"deleted": False} | |
| if is_active is not None: | |
| match_query["is_active"] = is_active | |
| # Aggregation pipeline | |
| pipeline = [ | |
| {"$match": match_query}, | |
| {"$sort": {"created_at": DESCENDING}}, | |
| {"$skip": skip}, | |
| {"$limit": limit}, | |
| # Join creator information | |
| {"$lookup": { | |
| "from": "users", | |
| "localField": "created_by", | |
| "foreignField": "_id", | |
| "as": "creator" | |
| }}, | |
| {"$unwind": { | |
| "path": "$creator", | |
| "preserveNullAndEmptyArrays": True | |
| }}, | |
| # Join with property_specifications (1-to-1) | |
| {"$lookup": { | |
| "from": "property_specifications", | |
| "localField": "_id", | |
| "foreignField": "property_id", | |
| "as": "specifications" | |
| }}, | |
| {"$unwind": { | |
| "path": "$specifications", | |
| "preserveNullAndEmptyArrays": True | |
| }}, | |
| # Join with amenities (1-to-many) | |
| {"$lookup": { | |
| "from": "amenities", | |
| "localField": "_id", | |
| "foreignField": "property_id", | |
| "as": "amenities", | |
| "pipeline": [ | |
| {"$match": {"is_active": True}} | |
| ] | |
| }}, | |
| # Join with property_images (1-to-many) | |
| {"$lookup": { | |
| "from": "property_images", | |
| "localField": "_id", | |
| "foreignField": "property_id", | |
| "as": "images", | |
| "pipeline": [ | |
| {"$match": {"is_active": True}} | |
| ] | |
| }}, | |
| # Join with documents (1-to-many) | |
| {"$lookup": { | |
| "from": "documents", | |
| "localField": "_id", | |
| "foreignField": "property_id", | |
| "as": "documents" | |
| }} | |
| ] | |
| # Execute aggregation | |
| docs = list(properties_col(db).aggregate(pipeline)) | |
| # Clean and return | |
| return [_clean_object(doc) for doc in docs] | |
| def update_property(db, property_id: str, fields: Dict[str, Any], session=None) -> Optional[Dict[str, Any]]: | |
| """Update property fields""" | |
| print(f"[REPO] Updating property {property_id} with fields: {list(fields.keys())}") | |
| fields["updated_at"] = _now() | |
| doc = properties_col(db).find_one_and_update( | |
| {"_id": _to_object_id(property_id)}, | |
| {"$set": fields}, | |
| return_document=ReturnDocument.AFTER, | |
| session=session | |
| ) | |
| if doc: | |
| print(f"[REPO] Property updated successfully") | |
| return _clean_object(doc) if doc else None | |
| def delete_property(db, property_id: str) -> bool: | |
| """Soft delete a property""" | |
| print(f"[REPO] Soft deleting property {property_id}") | |
| result = properties_col(db).update_one( | |
| {"_id": _to_object_id(property_id)}, | |
| {"$set": {"deleted": True, "is_active": False, "updated_at": _now()}} | |
| ) | |
| success = result.matched_count > 0 | |
| if success: | |
| print(f"[REPO] Property soft deleted successfully") | |
| else: | |
| print(f"[REPO] Property not found for deletion") | |
| return success | |
| def decrement_available_tokens(db, property_id: str, amount: int, session=None) -> bool: | |
| """Decrement available tokens atomically""" | |
| print(f"[REPO] Decrementing {amount} tokens from property {property_id}") | |
| result = properties_col(db).update_one( | |
| { | |
| "_id": _to_object_id(property_id), | |
| "available_tokens": {"$gte": amount} | |
| }, | |
| { | |
| "$inc": {"available_tokens": -amount}, | |
| "$set": {"updated_at": _now()} | |
| }, | |
| session=session | |
| ) | |
| success = result.matched_count > 0 | |
| if success: | |
| print(f"[REPO] Successfully decremented tokens") | |
| else: | |
| print(f"[REPO] ERROR: Failed to decrement tokens - insufficient available") | |
| return success | |
| # ============================================================================ | |
| # PROPERTY SPECIFICATIONS | |
| # ============================================================================ | |
| def property_specifications_col(db) -> Collection: | |
| return db.property_specifications | |
| def create_property_specification(db, property_id: str, spec_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """Create property specifications""" | |
| print(f"[REPO] Creating specifications for property {property_id}") | |
| now = _now() | |
| doc = { | |
| **spec_data, | |
| "property_id": _to_object_id(property_id), | |
| "created_at": now, | |
| "updated_at": now, | |
| } | |
| result = property_specifications_col(db).insert_one(doc) | |
| doc["_id"] = result.inserted_id | |
| return _clean_object(doc) | |
| def get_property_specification(db, property_id: str) -> Optional[Dict[str, Any]]: | |
| """Get specifications for a property""" | |
| doc = property_specifications_col(db).find_one({"property_id": _to_object_id(property_id)}) | |
| return _clean_object(doc) if doc else None | |
| # ============================================================================ | |
| # AMENITIES | |
| # ============================================================================ | |
| def amenities_col(db) -> Collection: | |
| return db.amenities | |
| def create_amenities(db, property_id: str, amenity_names: List[str]) -> List[Dict[str, Any]]: | |
| """Create multiple amenities for a property""" | |
| print(f"[REPO] Creating {len(amenity_names)} amenities for property {property_id}") | |
| now = _now() | |
| docs = [] | |
| for name in amenity_names: | |
| doc = { | |
| "property_id": _to_object_id(property_id), | |
| "name": name, | |
| "is_active": True, | |
| "created_at": now, | |
| "updated_at": now, | |
| } | |
| docs.append(doc) | |
| if docs: | |
| result = amenities_col(db).insert_many(docs) | |
| for i, inserted_id in enumerate(result.inserted_ids): | |
| docs[i]["_id"] = inserted_id | |
| return [_clean_object(doc) for doc in docs] | |
| def get_property_amenities(db, property_id: str) -> List[Dict[str, Any]]: | |
| """Get all amenities for a property""" | |
| docs = list(amenities_col(db).find({"property_id": _to_object_id(property_id), "is_active": True})) | |
| return [_clean_object(doc) for doc in docs] | |
| # ============================================================================ | |
| # PROPERTY IMAGES | |
| # ============================================================================ | |
| def property_images_col(db) -> Collection: | |
| return db.property_images | |
| def create_property_images(db, property_id: str, images_data: List[Dict[str, Any]], uploaded_by: str) -> List[Dict[str, Any]]: | |
| """Create multiple images for a property""" | |
| print(f"[REPO] Creating {len(images_data)} images for property {property_id}") | |
| now = _now() | |
| docs = [] | |
| for img_data in images_data: | |
| doc = { | |
| **img_data, | |
| "property_id": _to_object_id(property_id), | |
| "uploaded_by": _coerce_object_id(uploaded_by), | |
| "is_active": True, | |
| "created_at": now, | |
| "updated_at": now, | |
| } | |
| docs.append(doc) | |
| if docs: | |
| result = property_images_col(db).insert_many(docs) | |
| for i, inserted_id in enumerate(result.inserted_ids): | |
| docs[i]["_id"] = inserted_id | |
| return [_clean_object(doc) for doc in docs] | |
| def get_property_images(db, property_id: str) -> List[Dict[str, Any]]: | |
| """Get all images for a property""" | |
| docs = list(property_images_col(db).find({"property_id": _to_object_id(property_id), "is_active": True})) | |
| return [_clean_object(doc) for doc in docs] | |
| # ============================================================================ | |
| # INVESTMENTS | |
| # ============================================================================ | |
| def investments_col(db) -> Collection: | |
| return db.investments | |
| def create_investment(db, user_id: str, property_id: str, tokens_purchased: int, amount: float, status: str = "confirmed", profit_share: float = 0.0, session=None) -> Dict[str, Any]: | |
| """Create a new investment record""" | |
| print(f"[REPO] Creating investment: user={user_id}, property={property_id}, tokens={tokens_purchased}, amount={amount}") | |
| now = _now() | |
| doc = { | |
| "user_id": _to_object_id(user_id), | |
| "property_id": _to_object_id(property_id), | |
| "tokens_purchased": tokens_purchased, | |
| "amount": amount, | |
| "status": status, | |
| "profit_share": profit_share, | |
| "created_at": now, | |
| "updated_at": now, | |
| } | |
| result = investments_col(db).insert_one(doc, session=session) | |
| doc["_id"] = result.inserted_id | |
| print(f"[REPO] Investment created with ID: {result.inserted_id}") | |
| return _clean_object(doc) | |
| def get_user_investments(db, user_id: str) -> List[Dict[str, Any]]: | |
| """Get all investments for a user""" | |
| docs = list(investments_col(db).find({"user_id": _to_object_id(user_id)}).sort("created_at", DESCENDING)) | |
| return [_clean_object(doc) for doc in docs] | |
| def get_investment_by_user_and_property(db, user_id: str, property_id: str) -> Optional[Dict[str, Any]]: | |
| """Get investment by user and property""" | |
| doc = investments_col(db).find_one({ | |
| "user_id": _to_object_id(user_id), | |
| "property_id": _to_object_id(property_id) | |
| }) | |
| return _clean_object(doc) if doc else None | |
| def update_investment(db, investment_id: str, fields: Dict[str, Any], session=None) -> Optional[Dict[str, Any]]: | |
| """Update investment""" | |
| fields["updated_at"] = _now() | |
| doc = investments_col(db).find_one_and_update( | |
| {"_id": _to_object_id(investment_id)}, | |
| {"$set": fields}, | |
| return_document=ReturnDocument.AFTER, | |
| session=session | |
| ) | |
| return _clean_object(doc) if doc else None | |
| def reduce_investment(db, user_id: str, property_id: str, tokens_to_reduce: int, session=None) -> Optional[Dict[str, Any]]: | |
| """Reduce investment tokens when user sells back tokens""" | |
| print(f"[REPO] Reducing investment: user={user_id}, property={property_id}, tokens={tokens_to_reduce}") | |
| investment = get_investment_by_user_and_property(db, user_id, property_id) | |
| if not investment: | |
| print(f"[REPO] No investment found") | |
| return None | |
| current_tokens = investment.get('tokens_purchased', 0) | |
| if tokens_to_reduce > current_tokens: | |
| print(f"[REPO] Cannot reduce {tokens_to_reduce} tokens, user only has {current_tokens}") | |
| return None | |
| new_token_count = current_tokens - tokens_to_reduce | |
| if new_token_count == 0: | |
| # Delete investment if no tokens left | |
| print(f"[REPO] Removing investment (all tokens sold)") | |
| investments_col(db).delete_one({"_id": _to_object_id(investment['id'])}, session=session) | |
| return {**investment, 'tokens_purchased': 0} | |
| else: | |
| # Update investment with reduced tokens | |
| print(f"[REPO] Updating investment tokens: {current_tokens} -> {new_token_count}") | |
| updated = update_investment(db, investment['id'], {'tokens_purchased': new_token_count}, session=session) | |
| return updated | |
| def upsert_investment(db, user_id: str, property_id: str, tokens_purchased: int, amount: float, session=None) -> Dict[str, Any]: | |
| """Update existing investment or create new one""" | |
| print(f"[REPO] Upserting investment for user {user_id}, property {property_id}") | |
| now = _now() | |
| doc = investments_col(db).find_one_and_update( | |
| { | |
| "user_id": _to_object_id(user_id), | |
| "property_id": _to_object_id(property_id) | |
| }, | |
| { | |
| "$inc": { | |
| "tokens_purchased": tokens_purchased, | |
| "amount": amount | |
| }, | |
| "$set": { | |
| "status": "confirmed", | |
| "updated_at": now | |
| }, | |
| "$setOnInsert": { | |
| "profit_share": 0.0, | |
| "created_at": now | |
| } | |
| }, | |
| upsert=True, | |
| return_document=ReturnDocument.AFTER, | |
| session=session | |
| ) | |
| return _clean_object(doc) | |
| def get_user_portfolio_optimized(db, user_id: str) -> List[Dict[str, Any]]: | |
| """ | |
| Get user's complete portfolio with all related data in a single aggregation query | |
| Eliminates N+1 query problem by using MongoDB aggregation pipeline | |
| Returns investments with property, specifications, amenities, images, and tokens in one query | |
| """ | |
| pipeline = [ | |
| # Match investments for this user | |
| {"$match": {"user_id": _to_object_id(user_id)}}, | |
| {"$sort": {"created_at": DESCENDING}}, | |
| # Join with properties collection | |
| { | |
| "$lookup": { | |
| "from": "properties", | |
| "localField": "property_id", | |
| "foreignField": "_id", | |
| "as": "property" | |
| } | |
| }, | |
| {"$unwind": {"path": "$property", "preserveNullAndEmptyArrays": False}}, | |
| # Filter out deleted properties | |
| {"$match": {"property.deleted": False}}, | |
| # Join with property_specifications | |
| { | |
| "$lookup": { | |
| "from": "property_specifications", | |
| "localField": "property_id", | |
| "foreignField": "property_id", | |
| "as": "specifications" | |
| } | |
| }, | |
| {"$unwind": {"path": "$specifications", "preserveNullAndEmptyArrays": True}}, | |
| # Join with amenities | |
| { | |
| "$lookup": { | |
| "from": "amenities", | |
| "let": {"prop_id": "$property_id"}, | |
| "pipeline": [ | |
| {"$match": { | |
| "$expr": {"$eq": ["$property_id", "$$prop_id"]}, | |
| "is_active": True | |
| }} | |
| ], | |
| "as": "amenities" | |
| } | |
| }, | |
| # Join with property_images | |
| { | |
| "$lookup": { | |
| "from": "property_images", | |
| "let": {"prop_id": "$property_id"}, | |
| "pipeline": [ | |
| {"$match": { | |
| "$expr": {"$eq": ["$property_id", "$$prop_id"]}, | |
| "is_active": True | |
| }} | |
| ], | |
| "as": "images" | |
| } | |
| }, | |
| # Join with property_tokens | |
| { | |
| "$lookup": { | |
| "from": "property_tokens", | |
| "localField": "property_id", | |
| "foreignField": "property_id", | |
| "as": "tokens" | |
| } | |
| }, | |
| {"$unwind": {"path": "$tokens", "preserveNullAndEmptyArrays": True}}, | |
| # Project final structure | |
| { | |
| "$project": { | |
| "_id": 1, | |
| "user_id": 1, | |
| "property_id": 1, | |
| "tokens_purchased": 1, | |
| "amount": 1, | |
| "status": 1, | |
| "created_at": 1, | |
| "updated_at": 1, | |
| "property": 1, | |
| "specifications": 1, | |
| "amenities": 1, | |
| "images": 1, | |
| "tokens": 1 | |
| } | |
| } | |
| ] | |
| docs = list(investments_col(db).aggregate(pipeline)) | |
| return [_clean_object(doc) for doc in docs] | |
| # ============================================================================ | |
| # TRANSACTIONS | |
| # ============================================================================ | |
| def transactions_col(db) -> Collection: | |
| return db.transactions | |
| def create_transaction(db, user_id: str, wallet_id: Optional[str], tx_type: str, amount: float, | |
| property_id: Optional[str] = None, status: str = "pending", | |
| metadata: Optional[Dict[str, Any]] = None, session=None) -> Dict[str, Any]: | |
| """Create a transaction record""" | |
| print(f"[REPO] Creating transaction: type={tx_type}, amount={amount}, status={status}") | |
| now = _now() | |
| doc = { | |
| "user_id": _to_object_id(user_id), | |
| "wallet_id": _to_object_id(wallet_id) if wallet_id else None, | |
| "type": tx_type, | |
| "amount": amount, | |
| "property_id": _to_object_id(property_id) if property_id else None, | |
| "status": status, | |
| "metadata": metadata or {}, | |
| "created_at": now, | |
| "updated_at": now, | |
| } | |
| result = transactions_col(db).insert_one(doc, session=session) | |
| doc["_id"] = result.inserted_id | |
| print(f"[REPO] Transaction created with ID: {result.inserted_id}") | |
| return _clean_object(doc) | |
| def insert_transaction(db, transaction_data: Dict[str, Any], session=None) -> Dict[str, Any]: | |
| """Compatibility layer for legacy code paths expecting integer fils fields. | |
| Supports fields: | |
| - total_amount_aed (int fils) | |
| - amount (float) (legacy investment amount) | |
| - direction (credit/debit informational) | |
| - notes/payment_method/payment_reference | |
| """ | |
| # Map legacy naming: if total_amount_aed present, convert fils to float amount for storage uniformity | |
| amount = transaction_data.get("amount") | |
| total_amount_fils = transaction_data.get("total_amount_aed") | |
| metadata = transaction_data.get("metadata", {}) or {} | |
| if total_amount_fils is not None and amount is None: | |
| try: | |
| amount = float(total_amount_fils) / 100.0 | |
| metadata["total_amount_fils"] = int(total_amount_fils) | |
| except Exception: | |
| amount = 0.0 | |
| # Embed ancillary fields into metadata to avoid schema drift | |
| for k in ("notes", "payment_method", "payment_reference", "price_per_token", "xrp_cost", "blockchain_tx_hash", | |
| "direction", "counterparty_user_id", "counterparty_username", "counterparty_email", "property_name"): | |
| if k in transaction_data and transaction_data[k] is not None: | |
| metadata[k] = transaction_data[k] | |
| return create_transaction( | |
| db, | |
| user_id=transaction_data.get("user_id"), | |
| wallet_id=transaction_data.get("wallet_id"), | |
| tx_type=transaction_data.get("type"), | |
| amount=amount or 0.0, | |
| property_id=transaction_data.get("property_id"), | |
| status=transaction_data.get("status", "pending"), | |
| metadata=metadata, | |
| session=session, | |
| ) | |
| def get_user_transactions(db, user_id: str, skip: int = 0, limit: int = 100) -> List[Dict[str, Any]]: | |
| """Get all transactions for a user""" | |
| docs = list(transactions_col(db).find({"user_id": _to_object_id(user_id)}) | |
| .skip(skip).limit(limit).sort("created_at", DESCENDING)) | |
| return [_clean_object(doc) for doc in docs] | |
| def update_transaction_status(db, transaction_id: str, status: str, session=None) -> Optional[Dict[str, Any]]: | |
| """Update transaction status""" | |
| print(f"[REPO] Updating transaction {transaction_id} status to {status}") | |
| doc = transactions_col(db).find_one_and_update( | |
| {"_id": _to_object_id(transaction_id)}, | |
| {"$set": {"status": status, "updated_at": _now()}}, | |
| return_document=ReturnDocument.AFTER, | |
| session=session | |
| ) | |
| return _clean_object(doc) if doc else None | |
| # ============================================================================ | |
| # PORTFOLIOS | |
| # ============================================================================ | |
| def portfolios_col(db) -> Collection: | |
| return db.portfolios | |
| def create_or_update_portfolio(db, user_id: str, session=None) -> Dict[str, Any]: | |
| """Create or update user portfolio based on investments""" | |
| print(f"[REPO] Calculating portfolio for user {user_id}") | |
| # Get all user investments | |
| investments = get_user_investments(db, user_id) | |
| total_invested = sum(inv.get("amount", 0) for inv in investments) | |
| total_current_value = 0 | |
| # Calculate current value based on current token prices | |
| for inv in investments: | |
| property_obj = get_property_by_id(db, inv["property_id"]) | |
| if property_obj: | |
| current_value = inv.get("tokens_purchased", 0) * property_obj.get("token_price", 0) | |
| total_current_value += current_value | |
| total_profit = total_current_value - total_invested | |
| now = _now() | |
| doc = portfolios_col(db).find_one_and_update( | |
| {"user_id": _to_object_id(user_id)}, | |
| { | |
| "$set": { | |
| "total_invested": total_invested, | |
| "total_current_value": total_current_value, | |
| "total_profit": total_profit, | |
| "updated_at": now | |
| }, | |
| "$setOnInsert": { | |
| "created_at": now | |
| } | |
| }, | |
| upsert=True, | |
| return_document=ReturnDocument.AFTER, | |
| session=session | |
| ) | |
| print(f"[REPO] Portfolio updated: invested={total_invested}, current={total_current_value}, profit={total_profit}") | |
| return _clean_object(doc) | |
| def get_user_portfolio(db, user_id: str) -> Optional[Dict[str, Any]]: | |
| """Get user portfolio""" | |
| doc = portfolios_col(db).find_one({"user_id": _to_object_id(user_id)}) | |
| return _clean_object(doc) if doc else None | |
| # ============================================================================ | |
| # DOCUMENTS | |
| # ============================================================================ | |
| def documents_col(db) -> Collection: | |
| return db.documents | |
| def create_document(db, property_id: str, file_type: str, file_url: str, uploaded_by: str) -> Dict[str, Any]: | |
| """Create a document record""" | |
| print(f"[REPO] Creating document for property {property_id}, type: {file_type}") | |
| now = _now() | |
| doc = { | |
| "property_id": _to_object_id(property_id), | |
| "file_type": file_type, | |
| "file_url": file_url, | |
| "uploaded_by": _coerce_object_id(uploaded_by), | |
| "created_at": now, | |
| "updated_at": now, | |
| } | |
| result = documents_col(db).insert_one(doc) | |
| doc["_id"] = result.inserted_id | |
| return _clean_object(doc) | |
| def create_property_documents(db, property_id: str, documents_data: List[Dict[str, str]], uploaded_by: str) -> List[Dict[str, Any]]: | |
| """Create multiple documents for a property""" | |
| print(f"[REPO] Creating {len(documents_data)} documents for property {property_id}") | |
| now = _now() | |
| docs = [] | |
| for doc_data in documents_data: | |
| doc = { | |
| "property_id": _to_object_id(property_id), | |
| "file_type": doc_data.get("file_type", "pdf"), | |
| "file_url": doc_data.get("file_url", ""), | |
| "uploaded_by": _coerce_object_id(uploaded_by), | |
| "created_at": now, | |
| "updated_at": now, | |
| } | |
| docs.append(doc) | |
| if docs: | |
| result = documents_col(db).insert_many(docs) | |
| for i, inserted_id in enumerate(result.inserted_ids): | |
| docs[i]["_id"] = inserted_id | |
| return [_clean_object(doc) for doc in docs] | |
| def get_property_documents(db, property_id: str) -> List[Dict[str, Any]]: | |
| """Get all documents for a property""" | |
| docs = list(documents_col(db).find({"property_id": _to_object_id(property_id)})) | |
| return [_clean_object(doc) for doc in docs] | |
| # ============================================================================ | |
| # TRANSACTIONS (MISSING FUNCTIONS) | |
| # ============================================================================ | |
| def list_user_transactions(db, user_id: str, skip: int = 0, limit: int = 100) -> List[Dict[str, Any]]: | |
| docs = list(transactions_col(db).find({"user_id": _to_object_id(user_id)}).skip(skip).limit(limit).sort("created_at", DESCENDING)) | |
| return [_clean_object(d) for d in docs] | |
| def list_aed_wallet_transactions(db, user_id: str, limit: int = 50) -> List[Dict[str, Any]]: | |
| """List AED wallet add/deduct style transactions stored via insert_transaction wrapper.""" | |
| # Filter by metadata presence of total_amount_fils OR type in wallet_add/wallet_deduct/investment | |
| docs = list(transactions_col(db).find({ | |
| "user_id": _to_object_id(user_id), | |
| "type": {"$in": ["wallet_add", "wallet_deduct", "investment"]} | |
| }).sort("created_at", DESCENDING).limit(limit)) | |
| return [_clean_object(d) for d in docs] | |
| def record_admin_wallet_event( | |
| db, | |
| delta_fils: int, | |
| event_type: str, | |
| notes: str, | |
| *, | |
| counterparty_user: Optional[Dict[str, Any]] = None, | |
| property_obj: Optional[Dict[str, Any]] = None, | |
| payment_method: Optional[str] = None, | |
| payment_reference: Optional[str] = None, | |
| metadata: Optional[Dict[str, Any]] = None, | |
| session=None, | |
| ): | |
| """Adjust property creator's (admin) AED balance and mirror a transaction record. | |
| If property_obj is provided, credits the property creator's wallet. | |
| Otherwise, credits the primary admin's wallet. | |
| Stores admin AED balance in admin's wallet.balance (float AED). | |
| Direction inferred from delta. | |
| """ | |
| try: | |
| delta = int(delta_fils or 0) | |
| except Exception: | |
| delta = 0 | |
| # Determine which admin to credit | |
| admin_doc = None | |
| # If property is provided, credit the property creator | |
| if property_obj and property_obj.get('created_by'): | |
| admin_doc = get_user_by_id(db, property_obj['created_by']) | |
| if admin_doc: | |
| logger.info(f"record_admin_wallet_event: crediting property creator {admin_doc.get('email')}") | |
| # Fallback to primary admin if no property or creator not found | |
| if not admin_doc: | |
| admin_doc = get_primary_admin(db, session=session) | |
| if admin_doc: | |
| logger.info(f"record_admin_wallet_event: crediting primary admin {admin_doc.get('email')}") | |
| if not admin_doc: | |
| logger.warning("record_admin_wallet_event: no admin user found") | |
| return None | |
| direction = "credit" if delta >= 0 else "debit" | |
| amount_abs = abs(delta) | |
| # Get admin's wallet | |
| admin_wallet = None | |
| admin_wallet_id = None | |
| if delta != 0: | |
| # Update the admin's wallet balance instead of user document | |
| admin_wallet = wallets_col(db).find_one({"user_id": _to_object_id(admin_doc["id"])}) | |
| if admin_wallet: | |
| admin_wallet_id = str(admin_wallet["_id"]) | |
| # Convert fils to AED and update wallet balance | |
| delta_aed = float(delta) / 100.0 | |
| wallets_col(db).update_one( | |
| {"_id": admin_wallet["_id"]}, | |
| {"$inc": {"balance": delta_aed}}, | |
| session=session | |
| ) | |
| else: | |
| logger.warning(f"record_admin_wallet_event: admin {admin_doc['id']} has no wallet") | |
| # Build metadata with all transaction details | |
| tx_metadata: Dict[str, Any] = { | |
| "direction": direction, | |
| "notes": notes, | |
| } | |
| if property_obj: | |
| tx_metadata["property_name"] = property_obj.get("title") | |
| if counterparty_user: | |
| tx_metadata["counterparty_user_id"] = counterparty_user.get("id") | |
| if counterparty_user.get("name"): | |
| tx_metadata["counterparty_username"] = counterparty_user.get("name") | |
| if counterparty_user.get("email"): | |
| tx_metadata["counterparty_email"] = counterparty_user.get("email") | |
| if payment_method: | |
| tx_metadata["payment_method"] = payment_method | |
| if payment_reference: | |
| tx_metadata["payment_reference"] = payment_reference | |
| if metadata: | |
| tx_metadata.update(metadata) # Merge additional metadata | |
| # Convert fils to AED for the amount field | |
| amount_aed = float(amount_abs) / 100.0 | |
| tx_data: Dict[str, Any] = { | |
| "user_id": admin_doc["id"], | |
| "wallet_id": admin_wallet_id, # Include wallet_id | |
| "type": event_type, | |
| "amount": amount_aed, # Include amount in AED | |
| "status": "completed", | |
| "metadata": tx_metadata, | |
| } | |
| if property_obj: | |
| # property_obj id may be str | |
| tx_data["property_id"] = property_obj.get("id") | |
| return insert_transaction(db, tx_data, session=session) | |
| def upsert_token(db, property_id: str, token_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """Create or update token record for a property""" | |
| print(f"[REPO] Upserting token record for property {property_id}") | |
| now = _now() | |
| # Use tokens collection | |
| tokens_col = db.tokens | |
| doc = tokens_col.find_one_and_update( | |
| {"property_id": property_id}, | |
| { | |
| "$set": { | |
| **token_data, | |
| "property_id": property_id, | |
| "updated_at": now | |
| }, | |
| "$setOnInsert": { | |
| "created_at": now | |
| } | |
| }, | |
| upsert=True, | |
| return_document=ReturnDocument.AFTER | |
| ) | |
| return _clean_object(doc) | |
| def get_property_token(db, property_id: str) -> Optional[Dict[str, Any]]: | |
| """Get token information for a property""" | |
| tokens_col = db.tokens | |
| doc = tokens_col.find_one({"property_id": property_id}) | |
| return _clean_object(doc) if doc else None | |
| # ============================================================================ | |
| # STATISTICS | |
| # ============================================================================ | |
| def get_admin_stats(db) -> Dict[str, Any]: | |
| """Get platform statistics for admin dashboard""" | |
| print("[REPO] Calculating admin statistics...") | |
| stats = { | |
| "total_users": users_col(db).count_documents({"deleted": False}), | |
| "total_properties": properties_col(db).count_documents({"deleted": False}), | |
| "total_investments": investments_col(db).count_documents({}), | |
| "active_users": users_col(db).count_documents({"is_active": True, "deleted": False}), | |
| "total_tokens_sold": 0, | |
| "total_volume": 0.0, | |
| "total_revenue": 0.0, | |
| } | |
| # Calculate total tokens sold and volume | |
| pipeline = [ | |
| {"$group": { | |
| "_id": None, | |
| "total_tokens": {"$sum": "$tokens_purchased"}, | |
| "total_amount": {"$sum": "$amount"} | |
| }} | |
| ] | |
| result = list(investments_col(db).aggregate(pipeline)) | |
| if result: | |
| stats["total_tokens_sold"] = result[0].get("total_tokens", 0) | |
| stats["total_volume"] = result[0].get("total_amount", 0.0) | |
| stats["total_revenue"] = stats["total_volume"] * 0.02 # 2% platform fee | |
| print(f"[REPO] Stats calculated: {stats}") | |
| return stats | |
| def get_admin_specific_stats(db, admin_user_id: str) -> Dict[str, Any]: | |
| """Get statistics for a specific admin's properties only""" | |
| print(f"[REPO] Calculating admin-specific statistics for admin: {admin_user_id}") | |
| admin_oid = _coerce_object_id(admin_user_id) | |
| # Get properties created by this admin | |
| admin_properties = list(properties_col(db).find({"created_by": admin_oid, "deleted": False})) | |
| admin_property_ids = [prop["_id"] for prop in admin_properties] | |
| print(f"[REPO] Found {len(admin_property_ids)} properties created by admin") | |
| # Get investments for this admin's properties only | |
| admin_investments = list(investments_col(db).find({"property_id": {"$in": admin_property_ids}})) | |
| # Get unique buyers who invested in this admin's properties | |
| unique_buyers = len(set([inv.get("user_id") for inv in admin_investments if inv.get("user_id")])) | |
| # Calculate totals | |
| total_tokens_sold = sum(inv.get("tokens_purchased", 0) for inv in admin_investments) | |
| total_volume = sum(inv.get("amount", 0) for inv in admin_investments) | |
| total_revenue = total_volume * 0.02 # 2% platform fee | |
| stats = { | |
| "total_users": users_col(db).count_documents({"deleted": False}), # Platform-wide | |
| "total_properties": len(admin_property_ids), # Admin's properties only | |
| "total_investments": len(admin_investments), # Investments in admin's properties | |
| "active_users": unique_buyers, # Buyers who invested in admin's properties | |
| "total_tokens_sold": total_tokens_sold, | |
| "total_volume": total_volume, | |
| "total_revenue": total_revenue, | |
| } | |
| print(f"[REPO] Admin-specific stats calculated: {stats}") | |
| return stats | |
| def get_properties_by_creator(db, creator_id: str) -> List[Dict[str, Any]]: | |
| """Get all properties created by a specific admin""" | |
| creator_oid = _coerce_object_id(creator_id) | |
| properties = list(properties_col(db).find({"created_by": creator_oid, "deleted": False})) | |
| return [_clean_object(prop) for prop in properties] | |
| # ============================================================================ | |
| # RENT DISTRIBUTION OPERATIONS | |
| # ============================================================================ | |
| def rent_distributions_col(db) -> Collection: | |
| return db.rent_distributions | |
| def rent_payments_col(db) -> Collection: | |
| return db.rent_payments | |
| def get_investors_by_property(db, property_id: str) -> List[Dict[str, Any]]: | |
| """Get all investors (users with investments) for a specific property""" | |
| print(f"[REPO] Getting investors for property: {property_id}") | |
| property_oid = _to_object_id(property_id) | |
| # Find all unique users who have invested in this property | |
| pipeline = [ | |
| {"$match": {"property_id": property_oid}}, | |
| {"$group": { | |
| "_id": "$user_id", | |
| "total_tokens": {"$sum": "$tokens_purchased"}, | |
| "total_invested": {"$sum": "$investment_amount"} | |
| }}, | |
| {"$lookup": { | |
| "from": "users", | |
| "localField": "_id", | |
| "foreignField": "_id", | |
| "as": "user_info" | |
| }}, | |
| {"$unwind": "$user_info"}, | |
| {"$project": { | |
| "user_id": "$_id", | |
| "tokens_purchased": "$total_tokens", | |
| "total_invested": "$total_invested", | |
| "wallet_address": "$user_info.wallet_address", | |
| "email": "$user_info.email", | |
| "name": "$user_info.name" | |
| }} | |
| ] | |
| investors = list(investments_col(db).aggregate(pipeline)) | |
| # Clean the results | |
| result = [] | |
| for inv in investors: | |
| result.append({ | |
| "user_id": str(inv["user_id"]), | |
| "tokens_purchased": inv.get("tokens_purchased", 0), # Changed from tokens_owned | |
| "total_invested": inv.get("total_invested", 0), | |
| "wallet_address": inv.get("wallet_address"), | |
| "email": inv.get("email"), | |
| "name": inv.get("name") | |
| }) | |
| print(f"[REPO] Found {len(result)} investors for property {property_id}") | |
| return result | |
| def create_rent_distribution(db, property_id: str, total_rent_amount: float, | |
| rent_period_start: str, rent_period_end: str, | |
| total_tokens: int, rent_per_token: float, | |
| distribution_date: datetime, notes: Optional[str] = None, | |
| session=None) -> Dict[str, Any]: | |
| """Create a rent distribution record""" | |
| print(f"[REPO] Creating rent distribution for property: {property_id}, amount: {total_rent_amount}") | |
| now = _now() | |
| doc = { | |
| "property_id": _to_object_id(property_id), | |
| "total_rent_amount": total_rent_amount, | |
| "total_tokens": total_tokens, | |
| "rent_per_token": rent_per_token, | |
| "rent_period_start": rent_period_start, | |
| "rent_period_end": rent_period_end, | |
| "distribution_date": distribution_date, | |
| "total_investors": 0, | |
| "payments_completed": 0, | |
| "status": "pending", | |
| "notes": notes, | |
| "created_at": now, | |
| "updated_at": now, | |
| } | |
| result = rent_distributions_col(db).insert_one(doc, session=session) | |
| doc["_id"] = result.inserted_id | |
| print(f"[REPO] Rent distribution created with ID: {result.inserted_id}") | |
| return _clean_object(doc) | |
| def update_rent_distribution_status(db, distribution_id: str, status: str, | |
| total_investors: int, payments_completed: int, | |
| session=None) -> Dict[str, Any]: | |
| """Update rent distribution status and counts""" | |
| print(f"[REPO] Updating rent distribution {distribution_id}: status={status}, payments={payments_completed}/{total_investors}") | |
| updated = rent_distributions_col(db).find_one_and_update( | |
| {"_id": _to_object_id(distribution_id)}, | |
| { | |
| "$set": { | |
| "status": status, | |
| "total_investors": total_investors, | |
| "payments_completed": payments_completed, | |
| "updated_at": _now(), | |
| } | |
| }, | |
| return_document=ReturnDocument.AFTER, | |
| session=session | |
| ) | |
| return _clean_object(updated) if updated else None | |
| def create_rent_payment(db, distribution_id: str, user_id: str, property_id: str, | |
| tokens_owned: int, rent_amount: float, rent_period_start: str, | |
| rent_period_end: str, payment_status: str = "pending", | |
| wallet_credited: bool = False, transaction_id: Optional[str] = None, | |
| session=None) -> Dict[str, Any]: | |
| """Create a rent payment record for a user""" | |
| print(f"[REPO] Creating rent payment for user: {user_id}, amount: {rent_amount}") | |
| now = _now() | |
| doc = { | |
| "distribution_id": _to_object_id(distribution_id), | |
| "user_id": _to_object_id(user_id), | |
| "property_id": _to_object_id(property_id), | |
| "tokens_owned": tokens_owned, | |
| "rent_amount": rent_amount, | |
| "rent_period_start": rent_period_start, | |
| "rent_period_end": rent_period_end, | |
| "payment_status": payment_status, | |
| "wallet_credited": wallet_credited, | |
| "transaction_id": _to_object_id(transaction_id) if transaction_id else None, | |
| "payment_date": now if payment_status == "completed" else None, | |
| "created_at": now, | |
| } | |
| result = rent_payments_col(db).insert_one(doc, session=session) | |
| doc["_id"] = result.inserted_id | |
| print(f"[REPO] Rent payment created with ID: {result.inserted_id}") | |
| return _clean_object(doc) | |
| def get_user_rent_payments(db, user_id: str, property_id: Optional[str] = None) -> List[Dict[str, Any]]: | |
| """Get all rent payments for a user, optionally filtered by property""" | |
| print(f"[REPO] Getting rent payments for user: {user_id}") | |
| query = {"user_id": _to_object_id(user_id)} | |
| if property_id: | |
| query["property_id"] = _to_object_id(property_id) | |
| payments = list(rent_payments_col(db).find(query).sort("payment_date", DESCENDING)) | |
| return [_clean_object(payment) for payment in payments] | |
| def get_rent_distribution_by_id(db, distribution_id: str) -> Optional[Dict[str, Any]]: | |
| """Get a rent distribution by ID""" | |
| doc = rent_distributions_col(db).find_one({"_id": _to_object_id(distribution_id)}) | |
| return _clean_object(doc) if doc else None | |
| def get_user_rent_summary(db, user_id: str, property_id: str) -> Dict[str, Any]: | |
| """Get rent summary for a user's investment in a property""" | |
| print(f"[REPO] Getting rent summary for user: {user_id}, property: {property_id}") | |
| # Get all rent payments for this user and property | |
| payments = get_user_rent_payments(db, user_id, property_id) | |
| if not payments: | |
| return { | |
| "property_id": property_id, | |
| "total_rent_received": 0.0, | |
| "total_rent_payments": 0, | |
| "last_rent_amount": None, | |
| "last_rent_date": None, | |
| "average_monthly_rent": None, | |
| } | |
| # Calculate aggregates | |
| total_rent = sum(p.get("rent_amount", 0.0) for p in payments if p.get("payment_status") == "completed") | |
| completed_payments = [p for p in payments if p.get("payment_status") == "completed"] | |
| last_payment = completed_payments[0] if completed_payments else None | |
| return { | |
| "property_id": property_id, | |
| "total_rent_received": total_rent, | |
| "total_rent_payments": len(completed_payments), | |
| "last_rent_amount": last_payment.get("rent_amount") if last_payment else None, | |
| "last_rent_date": last_payment.get("payment_date") if last_payment else None, | |
| "average_monthly_rent": total_rent / len(completed_payments) if completed_payments else None, | |
| } | |
| # ===================================================================== | |
| # SESSION MANAGEMENT | |
| # ===================================================================== | |
| def sessions_col(db): | |
| """Get sessions collection""" | |
| return db.sessions | |
| def create_session( | |
| db, | |
| session_id: str, | |
| user_id: str, | |
| device_fingerprint: str, | |
| token: str, | |
| expires_at: datetime | |
| ) -> Dict[str, Any]: | |
| """ | |
| Create a new authentication session | |
| Args: | |
| db: Database connection | |
| session_id: Unique session identifier | |
| user_id: User's ID | |
| device_fingerprint: Device fingerprint hash | |
| token: JWT token | |
| expires_at: Session expiration timestamp | |
| Returns: | |
| Created session document | |
| """ | |
| print(f"[REPO] Creating session for user: {user_id}") | |
| session_doc = { | |
| "session_id": session_id, | |
| "user_id": _to_object_id(user_id), | |
| "device_fingerprint": device_fingerprint, | |
| "token": token, | |
| "created_at": _now(), | |
| "last_activity": _now(), | |
| "expires_at": expires_at, | |
| "is_active": True | |
| } | |
| sessions_col(db).insert_one(session_doc) | |
| return _clean_object(session_doc) | |
| def get_session_by_id(db, session_id: str) -> Optional[Dict[str, Any]]: | |
| """Get session by session_id""" | |
| doc = sessions_col(db).find_one({"session_id": session_id, "is_active": True}) | |
| return _clean_object(doc) if doc else None | |
| def get_session_by_token(db, token: str) -> Optional[Dict[str, Any]]: | |
| """Get session by JWT token""" | |
| doc = sessions_col(db).find_one({"token": token, "is_active": True}) | |
| return _clean_object(doc) if doc else None | |
| def validate_session( | |
| db, | |
| session_id: str, | |
| device_fingerprint: str | |
| ) -> bool: | |
| """ | |
| Validate that a session exists and matches the device fingerprint | |
| Args: | |
| db: Database connection | |
| session_id: Session identifier | |
| device_fingerprint: Current device fingerprint | |
| Returns: | |
| True if session is valid, False otherwise | |
| """ | |
| session = get_session_by_id(db, session_id) | |
| if not session: | |
| print(f"[REPO] Session not found: {session_id}") | |
| return False | |
| # Check if expired | |
| if session.get("expires_at") and session["expires_at"] < _now(): | |
| print(f"[REPO] Session expired: {session_id}") | |
| invalidate_session(db, session_id) | |
| return False | |
| # Check device fingerprint match | |
| if session.get("device_fingerprint") != device_fingerprint: | |
| print(f"[REPO] Device fingerprint mismatch for session: {session_id}") | |
| return False | |
| # Update last activity | |
| sessions_col(db).update_one( | |
| {"session_id": session_id}, | |
| {"$set": {"last_activity": _now()}} | |
| ) | |
| return True | |
| def invalidate_session(db, session_id: str) -> bool: | |
| """Invalidate a session (logout)""" | |
| print(f"[REPO] Invalidating session: {session_id}") | |
| result = sessions_col(db).update_one( | |
| {"session_id": session_id}, | |
| {"$set": {"is_active": False, "invalidated_at": _now()}} | |
| ) | |
| return result.modified_count > 0 | |
| def invalidate_user_sessions(db, user_id: str) -> int: | |
| """Invalidate all sessions for a user (logout from all devices)""" | |
| print(f"[REPO] Invalidating all sessions for user: {user_id}") | |
| result = sessions_col(db).update_many( | |
| {"user_id": _to_object_id(user_id), "is_active": True}, | |
| {"$set": {"is_active": False, "invalidated_at": _now()}} | |
| ) | |
| return result.modified_count | |
| def get_user_active_sessions(db, user_id: str) -> List[Dict[str, Any]]: | |
| """Get all active sessions for a user""" | |
| sessions = list(sessions_col(db).find({ | |
| "user_id": _to_object_id(user_id), | |
| "is_active": True | |
| }).sort("last_activity", DESCENDING)) | |
| return [_clean_object(s) for s in sessions] | |
| def cleanup_expired_sessions(db) -> int: | |
| """Clean up expired sessions (can be run periodically)""" | |
| print("[REPO] Cleaning up expired sessions") | |
| result = sessions_col(db).update_many( | |
| { | |
| "expires_at": {"$lt": _now()}, | |
| "is_active": True | |
| }, | |
| {"$set": {"is_active": False, "invalidated_at": _now()}} | |
| ) | |
| return result.modified_count | |
| # ============================================================================ | |
| # SECONDARY MARKET TRANSACTIONS OPERATIONS | |
| # ============================================================================ | |
| def secondary_market_col(db) -> Collection: | |
| """Get secondary_market_transactions collection""" | |
| return db["secondary_market_transactions"] | |
| def create_market_transaction( | |
| db, | |
| transaction_id: str, | |
| transaction_type: str, | |
| property_id: str, | |
| property_title: str, | |
| token_currency: str, | |
| seller_id: str, | |
| seller_email: str, | |
| seller_xrp_address: str, | |
| buyer_id: str, | |
| buyer_email: str, | |
| buyer_xrp_address: str, | |
| tokens_amount: int, | |
| price_per_token: float, | |
| total_amount: float, | |
| blockchain_tx_hash: Optional[str] = None, | |
| notes: Optional[str] = None, | |
| session=None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Create a new secondary market transaction record | |
| Args: | |
| db: Database instance | |
| transaction_id: Unique transaction identifier | |
| transaction_type: Type of market transaction (sell_to_admin, etc.) | |
| property_id: Property ID | |
| property_title: Property title | |
| token_currency: Token currency code | |
| seller_id: Seller user ID | |
| seller_email: Seller email | |
| seller_xrp_address: Seller XRP address | |
| buyer_id: Buyer user ID (admin for sell_to_admin) | |
| buyer_email: Buyer email | |
| buyer_xrp_address: Buyer XRP address | |
| tokens_amount: Number of tokens traded | |
| price_per_token: Price per token | |
| total_amount: Total transaction amount | |
| blockchain_tx_hash: Blockchain transaction hash | |
| notes: Additional notes | |
| session: MongoDB session for transactions | |
| Returns: | |
| Created market transaction document | |
| """ | |
| logger.info(f"[REPO] Creating market transaction: {transaction_id}") | |
| market_tx = { | |
| "transaction_id": transaction_id, | |
| "transaction_type": transaction_type, | |
| "status": "completed" if blockchain_tx_hash else "pending", | |
| "property_id": _coerce_object_id(property_id), | |
| "property_title": property_title, | |
| "token_currency": token_currency, | |
| "seller_id": _coerce_object_id(seller_id), | |
| "seller_email": seller_email, | |
| "seller_xrp_address": seller_xrp_address, | |
| "buyer_id": _coerce_object_id(buyer_id), | |
| "buyer_email": buyer_email, | |
| "buyer_xrp_address": buyer_xrp_address, | |
| "tokens_amount": tokens_amount, | |
| "price_per_token": price_per_token, | |
| "total_amount": total_amount, | |
| "currency": "AED", | |
| "blockchain_tx_hash": blockchain_tx_hash, | |
| "blockchain_confirmed": bool(blockchain_tx_hash), | |
| "blockchain_confirmed_at": _now() if blockchain_tx_hash else None, | |
| "db_investment_updated": False, | |
| "db_wallet_updated": False, | |
| "db_property_updated": False, | |
| "db_transaction_recorded": False, | |
| "initiated_at": _now(), | |
| "completed_at": _now() if blockchain_tx_hash else None, | |
| "notes": notes, | |
| "error_message": None | |
| } | |
| result = secondary_market_col(db).insert_one(market_tx, session=session) | |
| market_tx["_id"] = result.inserted_id | |
| logger.info(f"[REPO] Market transaction created: {transaction_id}") | |
| return _clean_object(market_tx) | |
| def update_market_transaction_status( | |
| db, | |
| transaction_id: str, | |
| status: str, | |
| blockchain_tx_hash: Optional[str] = None, | |
| error_message: Optional[str] = None, | |
| db_updates: Optional[Dict[str, bool]] = None, | |
| session=None | |
| ) -> Optional[Dict[str, Any]]: | |
| """ | |
| Update market transaction status and details | |
| Args: | |
| db: Database instance | |
| transaction_id: Transaction ID to update | |
| status: New status (pending, completed, failed, etc.) | |
| blockchain_tx_hash: Blockchain transaction hash | |
| error_message: Error message if failed | |
| db_updates: Dictionary of database update flags (investment_updated, wallet_updated, etc.) | |
| session: MongoDB session for transactions | |
| Returns: | |
| Updated market transaction document or None | |
| """ | |
| logger.info(f"[REPO] Updating market transaction {transaction_id} to status: {status}") | |
| update_fields = { | |
| "status": status | |
| } | |
| if blockchain_tx_hash: | |
| update_fields["blockchain_tx_hash"] = blockchain_tx_hash | |
| update_fields["blockchain_confirmed"] = True | |
| update_fields["blockchain_confirmed_at"] = _now() | |
| if error_message: | |
| update_fields["error_message"] = error_message | |
| if status == "completed": | |
| update_fields["completed_at"] = _now() | |
| if db_updates: | |
| if "investment_updated" in db_updates: | |
| update_fields["db_investment_updated"] = db_updates["investment_updated"] | |
| if "wallet_updated" in db_updates: | |
| update_fields["db_wallet_updated"] = db_updates["wallet_updated"] | |
| if "property_updated" in db_updates: | |
| update_fields["db_property_updated"] = db_updates["property_updated"] | |
| if "transaction_recorded" in db_updates: | |
| update_fields["db_transaction_recorded"] = db_updates["transaction_recorded"] | |
| result = secondary_market_col(db).find_one_and_update( | |
| {"transaction_id": transaction_id}, | |
| {"$set": update_fields}, | |
| return_document=ReturnDocument.AFTER, | |
| session=session | |
| ) | |
| if result: | |
| logger.info(f"[REPO] Market transaction {transaction_id} updated successfully") | |
| return _clean_object(result) | |
| logger.warning(f"[REPO] Market transaction {transaction_id} not found") | |
| return None | |
| def get_user_market_history( | |
| db, | |
| user_id: str, | |
| transaction_type: Optional[str] = None, | |
| status: Optional[str] = None, | |
| limit: int = 100 | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Get user's secondary market transaction history | |
| Args: | |
| db: Database instance | |
| user_id: User ID | |
| transaction_type: Filter by transaction type (optional) | |
| status: Filter by status (optional) | |
| limit: Maximum number of records to return | |
| Returns: | |
| List of market transactions | |
| """ | |
| logger.info(f"[REPO] Fetching market history for user: {user_id}") | |
| query = { | |
| "$or": [ | |
| {"seller_id": _coerce_object_id(user_id)}, | |
| {"buyer_id": _coerce_object_id(user_id)} | |
| ] | |
| } | |
| if transaction_type: | |
| query["transaction_type"] = transaction_type | |
| if status: | |
| query["status"] = status | |
| transactions = list( | |
| secondary_market_col(db) | |
| .find(query) | |
| .sort("initiated_at", DESCENDING) | |
| .limit(limit) | |
| ) | |
| logger.info(f"[REPO] Found {len(transactions)} market transactions for user {user_id}") | |
| return [_clean_object(tx) for tx in transactions] | |
| def get_property_market_history( | |
| db, | |
| property_id: str, | |
| status: Optional[str] = None, | |
| limit: int = 100 | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Get property's secondary market transaction history | |
| Args: | |
| db: Database instance | |
| property_id: Property ID | |
| status: Filter by status (optional) | |
| limit: Maximum number of records to return | |
| Returns: | |
| List of market transactions for the property | |
| """ | |
| logger.info(f"[REPO] Fetching market history for property: {property_id}") | |
| query = {"property_id": _coerce_object_id(property_id)} | |
| if status: | |
| query["status"] = status | |
| transactions = list( | |
| secondary_market_col(db) | |
| .find(query) | |
| .sort("initiated_at", DESCENDING) | |
| .limit(limit) | |
| ) | |
| logger.info(f"[REPO] Found {len(transactions)} market transactions for property {property_id}") | |
| return [_clean_object(tx) for tx in transactions] | |
| def get_market_transaction(db, transaction_id: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Get a specific market transaction by ID | |
| Args: | |
| db: Database instance | |
| transaction_id: Transaction ID | |
| Returns: | |
| Market transaction document or None | |
| """ | |
| transaction = secondary_market_col(db).find_one({"transaction_id": transaction_id}) | |
| if transaction: | |
| return _clean_object(transaction) | |
| return None | |
| # ============================================================================ | |
| # CARDS OPERATIONS (Unified for both investor and admin) | |
| # ============================================================================ | |
| def cards_col(db) -> Collection: | |
| """Get cards collection (shared for investor and admin)""" | |
| return db["cards"] | |
| def _detect_card_type(card_number: str) -> str: | |
| """Detect card type from card number""" | |
| try: | |
| if card_number.startswith('4'): | |
| return 'visa' | |
| elif card_number.startswith(('51', '52', '53', '54', '55')) or (int(card_number[:6]) >= 222100 and int(card_number[:6]) <= 272099): | |
| return 'mastercard' | |
| elif card_number.startswith(('34', '37')): | |
| return 'amex' | |
| elif card_number.startswith('6011') or card_number.startswith('65'): | |
| return 'discover' | |
| except: | |
| pass | |
| return 'unknown' | |
| def create_card( | |
| db, | |
| user_id: str, | |
| user_role: str, # "investor" or "admin" | |
| card_number: str, | |
| expiry_month: int, | |
| expiry_year: int, | |
| cardholder_name: str, | |
| bank_name: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Create a new card for user/admin (stores only last 4 digits for security) | |
| Args: | |
| db: Database instance | |
| user_id: User's ID | |
| user_role: "investor" or "admin" | |
| card_number: Full card number (will only store last 4) | |
| expiry_month: Expiry month (1-12) | |
| expiry_year: Expiry year (YYYY) | |
| cardholder_name: Name on card | |
| bank_name: Issuing bank name (optional) | |
| Returns: | |
| Created card document (secure - no full card number) | |
| """ | |
| logger.info(f"[REPO] Creating card for {user_role} user: {user_id}") | |
| # Detect card type from full number | |
| card_type = _detect_card_type(card_number) | |
| # Store only last 4 digits | |
| last_four = card_number[-4:] | |
| # Check if card already exists for this user+role | |
| existing = cards_col(db).find_one({ | |
| "user_id": _to_object_id(user_id), | |
| "user_role": user_role, | |
| "last_four": last_four, | |
| "expiry_month": expiry_month, | |
| "expiry_year": expiry_year, | |
| "is_active": True | |
| }) | |
| if existing: | |
| logger.warning(f"[REPO] Card with last four {last_four} already exists for {user_role} {user_id}") | |
| raise ValueError("Card already exists") | |
| # Check if this should be default (first card for this user+role) | |
| existing_count = cards_col(db).count_documents({ | |
| "user_id": _to_object_id(user_id), | |
| "user_role": user_role, | |
| "is_active": True | |
| }) | |
| is_default = existing_count == 0 | |
| now = _now() | |
| card_doc = { | |
| "user_id": _to_object_id(user_id), | |
| "user_role": user_role, | |
| "card_type": card_type, | |
| "last_four": last_four, | |
| "cardholder_name": cardholder_name, | |
| "bank_name": bank_name if bank_name else None, | |
| "expiry_month": expiry_month, | |
| "expiry_year": expiry_year, | |
| "is_default": is_default, | |
| "is_active": True, | |
| "created_at": now, | |
| "updated_at": now | |
| } | |
| result = cards_col(db).insert_one(card_doc) | |
| card_doc["_id"] = result.inserted_id | |
| logger.info(f"[REPO] Card created with ID: {result.inserted_id}") | |
| return _clean_object(card_doc) | |
| def get_cards(db, user_id: str, user_role: str) -> List[Dict[str, Any]]: | |
| """Get all active cards for a user/admin""" | |
| logger.info(f"[REPO] Getting cards for {user_role} user: {user_id}") | |
| cards = list(cards_col(db).find({ | |
| "user_id": _to_object_id(user_id), | |
| "user_role": user_role, | |
| "is_active": True | |
| }).sort("created_at", DESCENDING)) | |
| return [_clean_object(card) for card in cards] | |
| def delete_card(db, user_id: str, user_role: str, card_id: str) -> bool: | |
| """Soft delete a user's/admin's card""" | |
| logger.info(f"[REPO] Deleting card {card_id} for {user_role} user {user_id}") | |
| result = cards_col(db).update_one( | |
| { | |
| "_id": _to_object_id(card_id), | |
| "user_id": _to_object_id(user_id), | |
| "user_role": user_role | |
| }, | |
| {"$set": {"is_active": False, "updated_at": _now()}} | |
| ) | |
| if result.modified_count > 0: | |
| # If deleted card was default, set another card as default | |
| deleted_card = cards_col(db).find_one({"_id": _to_object_id(card_id)}) | |
| if deleted_card and deleted_card.get("is_default"): | |
| next_card = cards_col(db).find_one({ | |
| "user_id": _to_object_id(user_id), | |
| "user_role": user_role, | |
| "is_active": True | |
| }) | |
| if next_card: | |
| cards_col(db).update_one( | |
| {"_id": next_card["_id"]}, | |
| {"$set": {"is_default": True}} | |
| ) | |
| return result.modified_count > 0 | |
| def set_default_card(db, user_id: str, user_role: str, card_id: str) -> bool: | |
| """Set a card as the default payment method""" | |
| logger.info(f"[REPO] Setting card {card_id} as default for {user_role} user {user_id}") | |
| # Unset current default for this user+role | |
| cards_col(db).update_many( | |
| {"user_id": _to_object_id(user_id), "user_role": user_role, "is_default": True}, | |
| {"$set": {"is_default": False}} | |
| ) | |
| # Set new default | |
| result = cards_col(db).update_one( | |
| { | |
| "_id": _to_object_id(card_id), | |
| "user_id": _to_object_id(user_id), | |
| "user_role": user_role, | |
| "is_active": True | |
| }, | |
| {"$set": {"is_default": True}} | |
| ) | |
| return result.modified_count > 0 | |
| # Legacy aliases for backward compatibility | |
| def create_user_card(db, user_id, card_number, expiry_month, expiry_year, cardholder_name, bank_name=None): | |
| return create_card(db, user_id, "investor", card_number, expiry_month, expiry_year, cardholder_name, bank_name) | |
| def get_user_cards(db, user_id): | |
| return get_cards(db, user_id, "investor") | |
| def delete_user_card(db, user_id, card_id): | |
| return delete_card(db, user_id, "investor", card_id) | |
| # ============================================================================ | |
| # BANKS OPERATIONS (Unified for both investor and admin) | |
| # ============================================================================ | |
| def banks_col(db) -> Collection: | |
| """Get banks collection (shared for investor and admin)""" | |
| return db["banks"] | |
| def create_bank( | |
| db, | |
| user_id: str, | |
| user_role: str, # "investor" or "admin" | |
| bank_name: str, | |
| account_holder_name: str, | |
| account_number: str, | |
| account_type: str = "savings", | |
| iban: Optional[str] = None, | |
| swift_code: Optional[str] = None, | |
| currency: str = "AED" | |
| ) -> Dict[str, Any]: | |
| """ | |
| Create a new bank account for user/admin (stores only last 4 digits of account number) | |
| Args: | |
| db: Database instance | |
| user_id: User's ID | |
| user_role: "investor" or "admin" | |
| bank_name: Bank name | |
| account_holder_name: Account holder name | |
| account_number: Full account number (will store last 4 digits securely) | |
| account_type: Account type (savings, current, business) | |
| iban: IBAN (optional) | |
| swift_code: SWIFT code (optional) | |
| currency: Currency code (default AED) | |
| Returns: | |
| Created bank document (secure - masked account number) | |
| """ | |
| logger.info(f"[REPO] Creating bank account for {user_role} user: {user_id}") | |
| # Store last 4 digits for display | |
| account_number_last_four = account_number[-4:] | |
| iban_last_four = iban[-4:] if iban else None | |
| # Check if bank already exists for this user+role | |
| existing = banks_col(db).find_one({ | |
| "user_id": _to_object_id(user_id), | |
| "user_role": user_role, | |
| "account_number_last_four": account_number_last_four, | |
| "bank_name": bank_name, | |
| "is_active": True | |
| }) | |
| if existing: | |
| logger.warning(f"[REPO] Bank account already exists for {user_role} user {user_id}") | |
| raise ValueError("Bank account already exists") | |
| # Check if this should be default (first bank for this user+role) | |
| existing_count = banks_col(db).count_documents({ | |
| "user_id": _to_object_id(user_id), | |
| "user_role": user_role, | |
| "is_active": True | |
| }) | |
| is_default = existing_count == 0 | |
| now = _now() | |
| bank_doc = { | |
| "user_id": _to_object_id(user_id), | |
| "user_role": user_role, | |
| "bank_name": bank_name, | |
| "account_holder_name": account_holder_name, | |
| "account_number_last_four": account_number_last_four, | |
| "account_number_encrypted": account_number, # In production, encrypt this | |
| "iban_last_four": iban_last_four if iban_last_four else None, | |
| "iban_encrypted": iban if iban else None, # In production, encrypt this | |
| "swift_code": swift_code if swift_code else None, | |
| "account_type": account_type, | |
| "currency": currency, | |
| "is_default": is_default, | |
| "is_verified": False, # Can be verified via microdeposits | |
| "is_active": True, | |
| "created_at": now, | |
| "updated_at": now | |
| } | |
| result = banks_col(db).insert_one(bank_doc) | |
| bank_doc["_id"] = result.inserted_id | |
| # Remove encrypted fields from return value | |
| bank_doc.pop("account_number_encrypted", None) | |
| bank_doc.pop("iban_encrypted", None) | |
| # Remove None values for cleaner output | |
| bank_doc = {k: v for k, v in bank_doc.items() if v is not None} | |
| logger.info(f"[REPO] Bank account created with ID: {result.inserted_id}") | |
| return _clean_object(bank_doc) | |
| def get_banks(db, user_id: str, user_role: str) -> List[Dict[str, Any]]: | |
| """Get all active bank accounts for a user/admin""" | |
| logger.info(f"[REPO] Getting bank accounts for {user_role} user: {user_id}") | |
| banks = list(banks_col(db).find({ | |
| "user_id": _to_object_id(user_id), | |
| "user_role": user_role, | |
| "is_active": True | |
| }).sort("created_at", DESCENDING)) | |
| # Remove encrypted fields and None values from output | |
| result = [] | |
| for bank in banks: | |
| bank_clean = _clean_object(bank) | |
| bank_clean.pop("account_number_encrypted", None) | |
| bank_clean.pop("iban_encrypted", None) | |
| # Remove None values | |
| bank_clean = {k: v for k, v in bank_clean.items() if v is not None} | |
| result.append(bank_clean) | |
| return result | |
| def delete_bank(db, user_id: str, user_role: str, bank_id: str) -> bool: | |
| """Soft delete a user's/admin's bank account""" | |
| logger.info(f"[REPO] Deleting bank {bank_id} for {user_role} user {user_id}") | |
| result = banks_col(db).update_one( | |
| { | |
| "_id": _to_object_id(bank_id), | |
| "user_id": _to_object_id(user_id), | |
| "user_role": user_role | |
| }, | |
| {"$set": {"is_active": False, "updated_at": _now()}} | |
| ) | |
| if result.modified_count > 0: | |
| # If deleted bank was default, set another bank as default | |
| deleted_bank = banks_col(db).find_one({"_id": _to_object_id(bank_id)}) | |
| if deleted_bank and deleted_bank.get("is_default"): | |
| next_bank = banks_col(db).find_one({ | |
| "user_id": _to_object_id(user_id), | |
| "user_role": user_role, | |
| "is_active": True | |
| }) | |
| if next_bank: | |
| banks_col(db).update_one( | |
| {"_id": next_bank["_id"]}, | |
| {"$set": {"is_default": True}} | |
| ) | |
| return result.modified_count > 0 | |
| def set_default_bank(db, user_id: str, user_role: str, bank_id: str) -> bool: | |
| """Set a bank account as the default for withdrawals""" | |
| logger.info(f"[REPO] Setting bank {bank_id} as default for {user_role} user {user_id}") | |
| # Unset current default for this user+role | |
| banks_col(db).update_many( | |
| {"user_id": _to_object_id(user_id), "user_role": user_role, "is_default": True}, | |
| {"$set": {"is_default": False}} | |
| ) | |
| # Set new default | |
| result = banks_col(db).update_one( | |
| { | |
| "_id": _to_object_id(bank_id), | |
| "user_id": _to_object_id(user_id), | |
| "user_role": user_role, | |
| "is_active": True | |
| }, | |
| {"$set": {"is_default": True}} | |
| ) | |
| return result.modified_count > 0 | |
| # Legacy aliases for backward compatibility | |
| def create_user_bank(db, user_id, bank_name, account_holder_name, account_number, account_type="savings", iban=None, swift_code=None, currency="AED"): | |
| return create_bank(db, user_id, "investor", bank_name, account_holder_name, account_number, account_type, iban, swift_code, currency) | |
| def get_user_banks(db, user_id): | |
| return get_banks(db, user_id, "investor") | |
| def delete_user_bank(db, user_id, bank_id): | |
| return delete_bank(db, user_id, "investor", bank_id) | |