atriumchain-api / repo.py
Jainish1808's picture
Upload folder using huggingface_hub
4e4664a verified
"""
Production-Ready Repository Layer for Real Estate Tokenization Platform
Implements the complete database schema with MongoDB
"""
from datetime import datetime
from typing import Optional, List, Dict, Any
import logging
from pymongo.collection import Collection
from pymongo import ReturnDocument, ASCENDING, DESCENDING
from bson import ObjectId
from db import get_db, get_next_sequence
from config import settings
from utils.logger import setup_logger
logger = setup_logger(__name__)
def _now():
"""Get current UTC timestamp"""
return datetime.utcnow()
def _clean_object(data):
"""Clean MongoDB object by converting ObjectId to string"""
if isinstance(data, dict):
cleaned = {}
for key, value in data.items():
if key == "_id":
cleaned["id"] = str(value) if isinstance(value, ObjectId) else value
else:
cleaned[key] = _clean_object(value)
return cleaned
if isinstance(data, list):
return [_clean_object(item) for item in data]
if isinstance(data, ObjectId):
return str(data)
return data
def _to_object_id(id_str: str) -> ObjectId:
"""Convert string ID to ObjectId"""
try:
return ObjectId(id_str)
except:
raise ValueError(f"Invalid ObjectId: {id_str}")
def _coerce_object_id(id_value):
"""Return ObjectId when possible, otherwise fall back to original value."""
if id_value is None or isinstance(id_value, ObjectId):
return id_value
try:
return ObjectId(id_value)
except Exception:
logger.warning("Invalid ObjectId %r; storing plain value", id_value)
return id_value
# ============================================================================
# USER OPERATIONS
# ============================================================================
def users_col(db) -> Collection:
return db.users
def create_user(db, name: str, email: str, password_hash: str, country_code: str, phone: str, role: str = "user") -> Dict[str, Any]:
"""Create a new user"""
logger.info(f"Creating user: {email} with role: {role}")
now = _now()
doc = {
"name": name,
"email": email,
"password_hash": password_hash,
"role": role,
"country_code": country_code,
"phone": phone,
"wallet_id": None,
"is_active": True,
"deleted": False,
"created_at": now,
"updated_at": now,
}
result = users_col(db).insert_one(doc)
doc["_id"] = result.inserted_id
logger.info(f"User created successfully with ID: {result.inserted_id}")
return _clean_object(doc)
def get_user_by_email(db, email: str) -> Optional[Dict[str, Any]]:
"""Get user by email"""
doc = users_col(db).find_one({"email": email, "deleted": False})
return _clean_object(doc) if doc else None
def get_user_by_id(db, user_id: str) -> Optional[Dict[str, Any]]:
"""Get user by ID"""
doc = users_col(db).find_one({"_id": _to_object_id(user_id), "deleted": False})
return _clean_object(doc) if doc else None
def get_user_by_role(db, role: str) -> Optional[Dict[str, Any]]:
"""Get a user by role (e.g., 'admin', 'super_admin')"""
doc = users_col(db).find_one({"role": role, "deleted": False, "is_active": True})
return _clean_object(doc) if doc else None
def update_user(db, user_id: str, fields: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Update user fields"""
print(f"[REPO] Updating user {user_id} with fields: {list(fields.keys())}")
fields["updated_at"] = _now()
doc = users_col(db).find_one_and_update(
{"_id": _to_object_id(user_id)},
{"$set": fields},
return_document=ReturnDocument.AFTER
)
if doc:
print(f"[REPO] User updated successfully")
return _clean_object(doc) if doc else None
def list_all_users(db, skip: int = 0, limit: int = 100, fetch_live_xrp: bool = True) -> List[Dict[str, Any]]:
"""List all users with pagination and wallet information
Excludes admin users - only returns regular users with role='user'
Args:
db: Database connection
skip: Number of records to skip
limit: Maximum number of records to return
fetch_live_xrp: If True, fetch live XRP balance from blockchain (slower but accurate)
"""
# Filter to only show regular users, exclude admins
# user_docs = list(users_col(db).find({"deleted": False}).skip(skip).limit(limit).sort("created_at", DESCENDING))
user_docs = list(users_col(db).find({"deleted": False, "role": "user"}).skip(skip).limit(limit).sort("created_at", DESCENDING))
users_with_wallets = []
for user_doc in user_docs:
user = _clean_object(user_doc)
# Get wallet information for balances
wallet_doc = wallets_col(db).find_one({"user_id": _to_object_id(user["id"])})
if wallet_doc:
# Always use wallet.balance as the source of truth for AED balance
user["aed_balance"] = wallet_doc.get("balance", 0.0)
user["xrp_address"] = wallet_doc.get("xrp_address")
# Use cached XRP balance instead of live fetching (much faster!)
if wallet_doc.get("xrp_address"):
# Try to get cached balance from Redis first
cached_balance = None
try:
from utils.cache import get_cached_xrp_balance
cached_balance = get_cached_xrp_balance(wallet_doc.get("xrp_address"))
except ImportError:
pass
if cached_balance is not None:
# Use cached balance (fast)
user["xrp_balance"] = cached_balance
elif wallet_doc.get("xrp_balance_cached"):
# Use database cached balance (fallback)
user["xrp_balance"] = wallet_doc.get("xrp_balance_cached", 0.0)
elif fetch_live_xrp:
# Only fetch live if explicitly requested and no cache available
try:
from services.xrp_service import XRPLService
from utils.cache import cache_xrp_balance
xrpl_service = XRPLService()
live_xrp_balance = xrpl_service.get_xrp_balance(wallet_doc.get("xrp_address"))
user["xrp_balance"] = live_xrp_balance
# Cache the result for next time
cache_xrp_balance(wallet_doc.get("xrp_address"), live_xrp_balance, ttl=300)
# Also update database cache
wallets_col(db).update_one(
{"_id": wallet_doc["_id"]},
{"$set": {"xrp_balance_cached": live_xrp_balance, "balance_updated_at": _now()}}
)
except Exception as e:
logger.warning(f"Failed to fetch live XRP balance for {wallet_doc.get('xrp_address')}: {e}")
user["xrp_balance"] = 0.0
else:
user["xrp_balance"] = 0.0
else:
user["xrp_balance"] = 0.0
else:
# No wallet exists yet
user["aed_balance"] = 0.0
user["xrp_address"] = None
user["xrp_balance"] = 0.0
users_with_wallets.append(user)
return users_with_wallets
# ============================================================================
# ADMIN USER HELPERS
# ============================================================================
def get_primary_admin(db, session=None) -> Optional[Dict[str, Any]]:
"""Return first user with role == 'admin'."""
base_query = {"role": "admin", "deleted": False}
if session:
doc = users_col(db).find_one(base_query, session=session)
else:
doc = users_col(db).find_one(base_query)
return _clean_object(doc) if doc else None
# ============================================================================
# WALLET OPERATIONS
# ============================================================================
def wallets_col(db) -> Collection:
return db.wallets
def create_wallet(db, user_id: str, balance: float = 0.0, currency: str = "AED", xrp_address: str = None, xrp_seed: str = None) -> Dict[str, Any]:
"""Create a wallet for a user"""
print(f"[REPO] Creating wallet for user {user_id}, balance: {balance} {currency}")
now = _now()
doc = {
"user_id": _to_object_id(user_id),
"balance": balance,
"currency": currency,
"xrp_address": xrp_address,
"xrp_seed": xrp_seed, # Encrypted in production!
"is_active": True,
"created_at": now,
"updated_at": now,
}
result = wallets_col(db).insert_one(doc)
doc["_id"] = result.inserted_id
# Update user with wallet reference
users_col(db).update_one(
{"_id": _to_object_id(user_id)},
{"$set": {"wallet_id": result.inserted_id, "updated_at": now}}
)
print(f"[REPO] Wallet created successfully with ID: {result.inserted_id}")
return _clean_object(doc)
def get_wallet_by_user(db, user_id: str) -> Optional[Dict[str, Any]]:
"""Get wallet by user ID"""
doc = wallets_col(db).find_one({"user_id": _to_object_id(user_id)})
return _clean_object(doc) if doc else None
def update_wallet_balance(db, wallet_id: str, amount: float, operation: str = "add", session=None) -> Optional[Dict[str, Any]]:
"""Update wallet balance (add or subtract)"""
print(f"[REPO] {operation.capitalize()}ing {amount} to wallet {wallet_id}")
increment = amount if operation == "add" else -amount
doc = wallets_col(db).find_one_and_update(
{"_id": _to_object_id(wallet_id)},
{
"$inc": {"balance": increment},
"$set": {"updated_at": _now()}
},
return_document=ReturnDocument.AFTER,
session=session
)
if doc:
print(f"[REPO] Wallet balance updated. New balance: {doc.get('balance')}")
return _clean_object(doc) if doc else None
def update_wallet_xrp(db, wallet_id: str, xrp_address: str, xrp_seed: str) -> Optional[Dict[str, Any]]:
"""Update wallet with XRP address and seed"""
print(f"[REPO] Adding XRP wallet to wallet {wallet_id}: {xrp_address}")
doc = wallets_col(db).find_one_and_update(
{"_id": _to_object_id(wallet_id)},
{
"$set": {
"xrp_address": xrp_address,
"xrp_seed": xrp_seed, # Should be encrypted in production
"updated_at": _now()
}
},
return_document=ReturnDocument.AFTER
)
if doc:
print(f"[REPO] XRP wallet added successfully")
return _clean_object(doc) if doc else None
# ============================================================================
# PROPERTY OPERATIONS
# ============================================================================
def properties_col(db) -> Collection:
return db.properties
def create_property(db, property_data: Dict[str, Any], created_by: str) -> Dict[str, Any]:
"""Create a new property"""
print(f"[REPO] Creating property: {property_data.get('title')}")
now = _now()
doc = {
**property_data,
"available_tokens": property_data.get("total_tokens", 0),
"created_by": _coerce_object_id(created_by),
"is_active": True,
"deleted": False,
"created_at": now,
"updated_at": now,
}
result = properties_col(db).insert_one(doc)
doc["_id"] = result.inserted_id
print(f"[REPO] Property created successfully with ID: {result.inserted_id}")
return _clean_object(doc)
def get_property_by_id(db, property_id: str) -> Optional[Dict[str, Any]]:
"""Get property by ID"""
doc = properties_col(db).find_one({"_id": _to_object_id(property_id), "deleted": False})
return _clean_object(doc) if doc else None
def get_property_by_id_optimized(db, property_id: str) -> Optional[Dict[str, Any]]:
"""
Get property by ID with all related data using aggregation (optimized)
Returns property with specifications, amenities, images, and documents in a single query
"""
pipeline = [
{"$match": {"_id": _to_object_id(property_id), "deleted": False}},
# Join creator information
{"$lookup": {
"from": "users",
"localField": "created_by",
"foreignField": "_id",
"as": "creator"
}},
{"$unwind": {
"path": "$creator",
"preserveNullAndEmptyArrays": True
}},
# Join with property_specifications
{"$lookup": {
"from": "property_specifications",
"localField": "_id",
"foreignField": "property_id",
"as": "specifications"
}},
{"$unwind": {
"path": "$specifications",
"preserveNullAndEmptyArrays": True
}},
# Join with amenities
{"$lookup": {
"from": "amenities",
"localField": "_id",
"foreignField": "property_id",
"as": "amenities",
"pipeline": [
{"$match": {"is_active": True}}
]
}},
# Join with property_images
{"$lookup": {
"from": "property_images",
"localField": "_id",
"foreignField": "property_id",
"as": "images",
"pipeline": [
{"$match": {"is_active": True}}
]
}},
# Join with documents
{"$lookup": {
"from": "documents",
"localField": "_id",
"foreignField": "property_id",
"as": "documents"
}}
]
docs = list(properties_col(db).aggregate(pipeline))
if docs:
return _clean_object(docs[0])
return None
def list_properties(db, is_active: Optional[bool] = None, skip: int = 0, limit: int = 100) -> List[Dict[str, Any]]:
"""List properties with optional filtering"""
query = {"deleted": False}
if is_active is not None:
query["is_active"] = is_active
docs = list(properties_col(db).find(query).skip(skip).limit(limit).sort("created_at", DESCENDING))
return [_clean_object(doc) for doc in docs]
def list_properties_optimized(db, is_active: Optional[bool] = None, skip: int = 0, limit: int = 100) -> List[Dict[str, Any]]:
"""
List properties with related data using MongoDB aggregation (optimized - no N+1 queries)
Returns properties with specifications, amenities, images, and documents in a single query
"""
# Build match query
match_query = {"deleted": False}
if is_active is not None:
match_query["is_active"] = is_active
# Aggregation pipeline
pipeline = [
{"$match": match_query},
{"$sort": {"created_at": DESCENDING}},
{"$skip": skip},
{"$limit": limit},
# Join creator information
{"$lookup": {
"from": "users",
"localField": "created_by",
"foreignField": "_id",
"as": "creator"
}},
{"$unwind": {
"path": "$creator",
"preserveNullAndEmptyArrays": True
}},
# Join with property_specifications (1-to-1)
{"$lookup": {
"from": "property_specifications",
"localField": "_id",
"foreignField": "property_id",
"as": "specifications"
}},
{"$unwind": {
"path": "$specifications",
"preserveNullAndEmptyArrays": True
}},
# Join with amenities (1-to-many)
{"$lookup": {
"from": "amenities",
"localField": "_id",
"foreignField": "property_id",
"as": "amenities",
"pipeline": [
{"$match": {"is_active": True}}
]
}},
# Join with property_images (1-to-many)
{"$lookup": {
"from": "property_images",
"localField": "_id",
"foreignField": "property_id",
"as": "images",
"pipeline": [
{"$match": {"is_active": True}}
]
}},
# Join with documents (1-to-many)
{"$lookup": {
"from": "documents",
"localField": "_id",
"foreignField": "property_id",
"as": "documents"
}}
]
# Execute aggregation
docs = list(properties_col(db).aggregate(pipeline))
# Clean and return
return [_clean_object(doc) for doc in docs]
def update_property(db, property_id: str, fields: Dict[str, Any], session=None) -> Optional[Dict[str, Any]]:
"""Update property fields"""
print(f"[REPO] Updating property {property_id} with fields: {list(fields.keys())}")
fields["updated_at"] = _now()
doc = properties_col(db).find_one_and_update(
{"_id": _to_object_id(property_id)},
{"$set": fields},
return_document=ReturnDocument.AFTER,
session=session
)
if doc:
print(f"[REPO] Property updated successfully")
return _clean_object(doc) if doc else None
def delete_property(db, property_id: str) -> bool:
"""Soft delete a property"""
print(f"[REPO] Soft deleting property {property_id}")
result = properties_col(db).update_one(
{"_id": _to_object_id(property_id)},
{"$set": {"deleted": True, "is_active": False, "updated_at": _now()}}
)
success = result.matched_count > 0
if success:
print(f"[REPO] Property soft deleted successfully")
else:
print(f"[REPO] Property not found for deletion")
return success
def decrement_available_tokens(db, property_id: str, amount: int, session=None) -> bool:
"""Decrement available tokens atomically"""
print(f"[REPO] Decrementing {amount} tokens from property {property_id}")
result = properties_col(db).update_one(
{
"_id": _to_object_id(property_id),
"available_tokens": {"$gte": amount}
},
{
"$inc": {"available_tokens": -amount},
"$set": {"updated_at": _now()}
},
session=session
)
success = result.matched_count > 0
if success:
print(f"[REPO] Successfully decremented tokens")
else:
print(f"[REPO] ERROR: Failed to decrement tokens - insufficient available")
return success
# ============================================================================
# PROPERTY SPECIFICATIONS
# ============================================================================
def property_specifications_col(db) -> Collection:
return db.property_specifications
def create_property_specification(db, property_id: str, spec_data: Dict[str, Any]) -> Dict[str, Any]:
"""Create property specifications"""
print(f"[REPO] Creating specifications for property {property_id}")
now = _now()
doc = {
**spec_data,
"property_id": _to_object_id(property_id),
"created_at": now,
"updated_at": now,
}
result = property_specifications_col(db).insert_one(doc)
doc["_id"] = result.inserted_id
return _clean_object(doc)
def get_property_specification(db, property_id: str) -> Optional[Dict[str, Any]]:
"""Get specifications for a property"""
doc = property_specifications_col(db).find_one({"property_id": _to_object_id(property_id)})
return _clean_object(doc) if doc else None
# ============================================================================
# AMENITIES
# ============================================================================
def amenities_col(db) -> Collection:
return db.amenities
def create_amenities(db, property_id: str, amenity_names: List[str]) -> List[Dict[str, Any]]:
"""Create multiple amenities for a property"""
print(f"[REPO] Creating {len(amenity_names)} amenities for property {property_id}")
now = _now()
docs = []
for name in amenity_names:
doc = {
"property_id": _to_object_id(property_id),
"name": name,
"is_active": True,
"created_at": now,
"updated_at": now,
}
docs.append(doc)
if docs:
result = amenities_col(db).insert_many(docs)
for i, inserted_id in enumerate(result.inserted_ids):
docs[i]["_id"] = inserted_id
return [_clean_object(doc) for doc in docs]
def get_property_amenities(db, property_id: str) -> List[Dict[str, Any]]:
"""Get all amenities for a property"""
docs = list(amenities_col(db).find({"property_id": _to_object_id(property_id), "is_active": True}))
return [_clean_object(doc) for doc in docs]
# ============================================================================
# PROPERTY IMAGES
# ============================================================================
def property_images_col(db) -> Collection:
return db.property_images
def create_property_images(db, property_id: str, images_data: List[Dict[str, Any]], uploaded_by: str) -> List[Dict[str, Any]]:
"""Create multiple images for a property"""
print(f"[REPO] Creating {len(images_data)} images for property {property_id}")
now = _now()
docs = []
for img_data in images_data:
doc = {
**img_data,
"property_id": _to_object_id(property_id),
"uploaded_by": _coerce_object_id(uploaded_by),
"is_active": True,
"created_at": now,
"updated_at": now,
}
docs.append(doc)
if docs:
result = property_images_col(db).insert_many(docs)
for i, inserted_id in enumerate(result.inserted_ids):
docs[i]["_id"] = inserted_id
return [_clean_object(doc) for doc in docs]
def get_property_images(db, property_id: str) -> List[Dict[str, Any]]:
"""Get all images for a property"""
docs = list(property_images_col(db).find({"property_id": _to_object_id(property_id), "is_active": True}))
return [_clean_object(doc) for doc in docs]
# ============================================================================
# INVESTMENTS
# ============================================================================
def investments_col(db) -> Collection:
return db.investments
def create_investment(db, user_id: str, property_id: str, tokens_purchased: int, amount: float, status: str = "confirmed", profit_share: float = 0.0, session=None) -> Dict[str, Any]:
"""Create a new investment record"""
print(f"[REPO] Creating investment: user={user_id}, property={property_id}, tokens={tokens_purchased}, amount={amount}")
now = _now()
doc = {
"user_id": _to_object_id(user_id),
"property_id": _to_object_id(property_id),
"tokens_purchased": tokens_purchased,
"amount": amount,
"status": status,
"profit_share": profit_share,
"created_at": now,
"updated_at": now,
}
result = investments_col(db).insert_one(doc, session=session)
doc["_id"] = result.inserted_id
print(f"[REPO] Investment created with ID: {result.inserted_id}")
return _clean_object(doc)
def get_user_investments(db, user_id: str) -> List[Dict[str, Any]]:
"""Get all investments for a user"""
docs = list(investments_col(db).find({"user_id": _to_object_id(user_id)}).sort("created_at", DESCENDING))
return [_clean_object(doc) for doc in docs]
def get_investment_by_user_and_property(db, user_id: str, property_id: str) -> Optional[Dict[str, Any]]:
"""Get investment by user and property"""
doc = investments_col(db).find_one({
"user_id": _to_object_id(user_id),
"property_id": _to_object_id(property_id)
})
return _clean_object(doc) if doc else None
def update_investment(db, investment_id: str, fields: Dict[str, Any], session=None) -> Optional[Dict[str, Any]]:
"""Update investment"""
fields["updated_at"] = _now()
doc = investments_col(db).find_one_and_update(
{"_id": _to_object_id(investment_id)},
{"$set": fields},
return_document=ReturnDocument.AFTER,
session=session
)
return _clean_object(doc) if doc else None
def reduce_investment(db, user_id: str, property_id: str, tokens_to_reduce: int, session=None) -> Optional[Dict[str, Any]]:
"""Reduce investment tokens when user sells back tokens"""
print(f"[REPO] Reducing investment: user={user_id}, property={property_id}, tokens={tokens_to_reduce}")
investment = get_investment_by_user_and_property(db, user_id, property_id)
if not investment:
print(f"[REPO] No investment found")
return None
current_tokens = investment.get('tokens_purchased', 0)
if tokens_to_reduce > current_tokens:
print(f"[REPO] Cannot reduce {tokens_to_reduce} tokens, user only has {current_tokens}")
return None
new_token_count = current_tokens - tokens_to_reduce
if new_token_count == 0:
# Delete investment if no tokens left
print(f"[REPO] Removing investment (all tokens sold)")
investments_col(db).delete_one({"_id": _to_object_id(investment['id'])}, session=session)
return {**investment, 'tokens_purchased': 0}
else:
# Update investment with reduced tokens
print(f"[REPO] Updating investment tokens: {current_tokens} -> {new_token_count}")
updated = update_investment(db, investment['id'], {'tokens_purchased': new_token_count}, session=session)
return updated
def upsert_investment(db, user_id: str, property_id: str, tokens_purchased: int, amount: float, session=None) -> Dict[str, Any]:
"""Update existing investment or create new one"""
print(f"[REPO] Upserting investment for user {user_id}, property {property_id}")
now = _now()
doc = investments_col(db).find_one_and_update(
{
"user_id": _to_object_id(user_id),
"property_id": _to_object_id(property_id)
},
{
"$inc": {
"tokens_purchased": tokens_purchased,
"amount": amount
},
"$set": {
"status": "confirmed",
"updated_at": now
},
"$setOnInsert": {
"profit_share": 0.0,
"created_at": now
}
},
upsert=True,
return_document=ReturnDocument.AFTER,
session=session
)
return _clean_object(doc)
def get_user_portfolio_optimized(db, user_id: str) -> List[Dict[str, Any]]:
"""
Get user's complete portfolio with all related data in a single aggregation query
Eliminates N+1 query problem by using MongoDB aggregation pipeline
Returns investments with property, specifications, amenities, images, and tokens in one query
"""
pipeline = [
# Match investments for this user
{"$match": {"user_id": _to_object_id(user_id)}},
{"$sort": {"created_at": DESCENDING}},
# Join with properties collection
{
"$lookup": {
"from": "properties",
"localField": "property_id",
"foreignField": "_id",
"as": "property"
}
},
{"$unwind": {"path": "$property", "preserveNullAndEmptyArrays": False}},
# Filter out deleted properties
{"$match": {"property.deleted": False}},
# Join with property_specifications
{
"$lookup": {
"from": "property_specifications",
"localField": "property_id",
"foreignField": "property_id",
"as": "specifications"
}
},
{"$unwind": {"path": "$specifications", "preserveNullAndEmptyArrays": True}},
# Join with amenities
{
"$lookup": {
"from": "amenities",
"let": {"prop_id": "$property_id"},
"pipeline": [
{"$match": {
"$expr": {"$eq": ["$property_id", "$$prop_id"]},
"is_active": True
}}
],
"as": "amenities"
}
},
# Join with property_images
{
"$lookup": {
"from": "property_images",
"let": {"prop_id": "$property_id"},
"pipeline": [
{"$match": {
"$expr": {"$eq": ["$property_id", "$$prop_id"]},
"is_active": True
}}
],
"as": "images"
}
},
# Join with property_tokens
{
"$lookup": {
"from": "property_tokens",
"localField": "property_id",
"foreignField": "property_id",
"as": "tokens"
}
},
{"$unwind": {"path": "$tokens", "preserveNullAndEmptyArrays": True}},
# Project final structure
{
"$project": {
"_id": 1,
"user_id": 1,
"property_id": 1,
"tokens_purchased": 1,
"amount": 1,
"status": 1,
"created_at": 1,
"updated_at": 1,
"property": 1,
"specifications": 1,
"amenities": 1,
"images": 1,
"tokens": 1
}
}
]
docs = list(investments_col(db).aggregate(pipeline))
return [_clean_object(doc) for doc in docs]
# ============================================================================
# TRANSACTIONS
# ============================================================================
def transactions_col(db) -> Collection:
return db.transactions
def create_transaction(db, user_id: str, wallet_id: Optional[str], tx_type: str, amount: float,
property_id: Optional[str] = None, status: str = "pending",
metadata: Optional[Dict[str, Any]] = None, session=None) -> Dict[str, Any]:
"""Create a transaction record"""
print(f"[REPO] Creating transaction: type={tx_type}, amount={amount}, status={status}")
now = _now()
doc = {
"user_id": _to_object_id(user_id),
"wallet_id": _to_object_id(wallet_id) if wallet_id else None,
"type": tx_type,
"amount": amount,
"property_id": _to_object_id(property_id) if property_id else None,
"status": status,
"metadata": metadata or {},
"created_at": now,
"updated_at": now,
}
result = transactions_col(db).insert_one(doc, session=session)
doc["_id"] = result.inserted_id
print(f"[REPO] Transaction created with ID: {result.inserted_id}")
return _clean_object(doc)
def insert_transaction(db, transaction_data: Dict[str, Any], session=None) -> Dict[str, Any]:
"""Compatibility layer for legacy code paths expecting integer fils fields.
Supports fields:
- total_amount_aed (int fils)
- amount (float) (legacy investment amount)
- direction (credit/debit informational)
- notes/payment_method/payment_reference
"""
# Map legacy naming: if total_amount_aed present, convert fils to float amount for storage uniformity
amount = transaction_data.get("amount")
total_amount_fils = transaction_data.get("total_amount_aed")
metadata = transaction_data.get("metadata", {}) or {}
if total_amount_fils is not None and amount is None:
try:
amount = float(total_amount_fils) / 100.0
metadata["total_amount_fils"] = int(total_amount_fils)
except Exception:
amount = 0.0
# Embed ancillary fields into metadata to avoid schema drift
for k in ("notes", "payment_method", "payment_reference", "price_per_token", "xrp_cost", "blockchain_tx_hash",
"direction", "counterparty_user_id", "counterparty_username", "counterparty_email", "property_name"):
if k in transaction_data and transaction_data[k] is not None:
metadata[k] = transaction_data[k]
return create_transaction(
db,
user_id=transaction_data.get("user_id"),
wallet_id=transaction_data.get("wallet_id"),
tx_type=transaction_data.get("type"),
amount=amount or 0.0,
property_id=transaction_data.get("property_id"),
status=transaction_data.get("status", "pending"),
metadata=metadata,
session=session,
)
def get_user_transactions(db, user_id: str, skip: int = 0, limit: int = 100) -> List[Dict[str, Any]]:
"""Get all transactions for a user"""
docs = list(transactions_col(db).find({"user_id": _to_object_id(user_id)})
.skip(skip).limit(limit).sort("created_at", DESCENDING))
return [_clean_object(doc) for doc in docs]
def update_transaction_status(db, transaction_id: str, status: str, session=None) -> Optional[Dict[str, Any]]:
"""Update transaction status"""
print(f"[REPO] Updating transaction {transaction_id} status to {status}")
doc = transactions_col(db).find_one_and_update(
{"_id": _to_object_id(transaction_id)},
{"$set": {"status": status, "updated_at": _now()}},
return_document=ReturnDocument.AFTER,
session=session
)
return _clean_object(doc) if doc else None
# ============================================================================
# PORTFOLIOS
# ============================================================================
def portfolios_col(db) -> Collection:
return db.portfolios
def create_or_update_portfolio(db, user_id: str, session=None) -> Dict[str, Any]:
"""Create or update user portfolio based on investments"""
print(f"[REPO] Calculating portfolio for user {user_id}")
# Get all user investments
investments = get_user_investments(db, user_id)
total_invested = sum(inv.get("amount", 0) for inv in investments)
total_current_value = 0
# Calculate current value based on current token prices
for inv in investments:
property_obj = get_property_by_id(db, inv["property_id"])
if property_obj:
current_value = inv.get("tokens_purchased", 0) * property_obj.get("token_price", 0)
total_current_value += current_value
total_profit = total_current_value - total_invested
now = _now()
doc = portfolios_col(db).find_one_and_update(
{"user_id": _to_object_id(user_id)},
{
"$set": {
"total_invested": total_invested,
"total_current_value": total_current_value,
"total_profit": total_profit,
"updated_at": now
},
"$setOnInsert": {
"created_at": now
}
},
upsert=True,
return_document=ReturnDocument.AFTER,
session=session
)
print(f"[REPO] Portfolio updated: invested={total_invested}, current={total_current_value}, profit={total_profit}")
return _clean_object(doc)
def get_user_portfolio(db, user_id: str) -> Optional[Dict[str, Any]]:
"""Get user portfolio"""
doc = portfolios_col(db).find_one({"user_id": _to_object_id(user_id)})
return _clean_object(doc) if doc else None
# ============================================================================
# DOCUMENTS
# ============================================================================
def documents_col(db) -> Collection:
return db.documents
def create_document(db, property_id: str, file_type: str, file_url: str, uploaded_by: str) -> Dict[str, Any]:
"""Create a document record"""
print(f"[REPO] Creating document for property {property_id}, type: {file_type}")
now = _now()
doc = {
"property_id": _to_object_id(property_id),
"file_type": file_type,
"file_url": file_url,
"uploaded_by": _coerce_object_id(uploaded_by),
"created_at": now,
"updated_at": now,
}
result = documents_col(db).insert_one(doc)
doc["_id"] = result.inserted_id
return _clean_object(doc)
def create_property_documents(db, property_id: str, documents_data: List[Dict[str, str]], uploaded_by: str) -> List[Dict[str, Any]]:
"""Create multiple documents for a property"""
print(f"[REPO] Creating {len(documents_data)} documents for property {property_id}")
now = _now()
docs = []
for doc_data in documents_data:
doc = {
"property_id": _to_object_id(property_id),
"file_type": doc_data.get("file_type", "pdf"),
"file_url": doc_data.get("file_url", ""),
"uploaded_by": _coerce_object_id(uploaded_by),
"created_at": now,
"updated_at": now,
}
docs.append(doc)
if docs:
result = documents_col(db).insert_many(docs)
for i, inserted_id in enumerate(result.inserted_ids):
docs[i]["_id"] = inserted_id
return [_clean_object(doc) for doc in docs]
def get_property_documents(db, property_id: str) -> List[Dict[str, Any]]:
"""Get all documents for a property"""
docs = list(documents_col(db).find({"property_id": _to_object_id(property_id)}))
return [_clean_object(doc) for doc in docs]
# ============================================================================
# TRANSACTIONS (MISSING FUNCTIONS)
# ============================================================================
def list_user_transactions(db, user_id: str, skip: int = 0, limit: int = 100) -> List[Dict[str, Any]]:
docs = list(transactions_col(db).find({"user_id": _to_object_id(user_id)}).skip(skip).limit(limit).sort("created_at", DESCENDING))
return [_clean_object(d) for d in docs]
def list_aed_wallet_transactions(db, user_id: str, limit: int = 50) -> List[Dict[str, Any]]:
"""List AED wallet add/deduct style transactions stored via insert_transaction wrapper."""
# Filter by metadata presence of total_amount_fils OR type in wallet_add/wallet_deduct/investment
docs = list(transactions_col(db).find({
"user_id": _to_object_id(user_id),
"type": {"$in": ["wallet_add", "wallet_deduct", "investment"]}
}).sort("created_at", DESCENDING).limit(limit))
return [_clean_object(d) for d in docs]
def record_admin_wallet_event(
db,
delta_fils: int,
event_type: str,
notes: str,
*,
counterparty_user: Optional[Dict[str, Any]] = None,
property_obj: Optional[Dict[str, Any]] = None,
payment_method: Optional[str] = None,
payment_reference: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
session=None,
):
"""Adjust property creator's (admin) AED balance and mirror a transaction record.
If property_obj is provided, credits the property creator's wallet.
Otherwise, credits the primary admin's wallet.
Stores admin AED balance in admin's wallet.balance (float AED).
Direction inferred from delta.
"""
try:
delta = int(delta_fils or 0)
except Exception:
delta = 0
# Determine which admin to credit
admin_doc = None
# If property is provided, credit the property creator
if property_obj and property_obj.get('created_by'):
admin_doc = get_user_by_id(db, property_obj['created_by'])
if admin_doc:
logger.info(f"record_admin_wallet_event: crediting property creator {admin_doc.get('email')}")
# Fallback to primary admin if no property or creator not found
if not admin_doc:
admin_doc = get_primary_admin(db, session=session)
if admin_doc:
logger.info(f"record_admin_wallet_event: crediting primary admin {admin_doc.get('email')}")
if not admin_doc:
logger.warning("record_admin_wallet_event: no admin user found")
return None
direction = "credit" if delta >= 0 else "debit"
amount_abs = abs(delta)
# Get admin's wallet
admin_wallet = None
admin_wallet_id = None
if delta != 0:
# Update the admin's wallet balance instead of user document
admin_wallet = wallets_col(db).find_one({"user_id": _to_object_id(admin_doc["id"])})
if admin_wallet:
admin_wallet_id = str(admin_wallet["_id"])
# Convert fils to AED and update wallet balance
delta_aed = float(delta) / 100.0
wallets_col(db).update_one(
{"_id": admin_wallet["_id"]},
{"$inc": {"balance": delta_aed}},
session=session
)
else:
logger.warning(f"record_admin_wallet_event: admin {admin_doc['id']} has no wallet")
# Build metadata with all transaction details
tx_metadata: Dict[str, Any] = {
"direction": direction,
"notes": notes,
}
if property_obj:
tx_metadata["property_name"] = property_obj.get("title")
if counterparty_user:
tx_metadata["counterparty_user_id"] = counterparty_user.get("id")
if counterparty_user.get("name"):
tx_metadata["counterparty_username"] = counterparty_user.get("name")
if counterparty_user.get("email"):
tx_metadata["counterparty_email"] = counterparty_user.get("email")
if payment_method:
tx_metadata["payment_method"] = payment_method
if payment_reference:
tx_metadata["payment_reference"] = payment_reference
if metadata:
tx_metadata.update(metadata) # Merge additional metadata
# Convert fils to AED for the amount field
amount_aed = float(amount_abs) / 100.0
tx_data: Dict[str, Any] = {
"user_id": admin_doc["id"],
"wallet_id": admin_wallet_id, # Include wallet_id
"type": event_type,
"amount": amount_aed, # Include amount in AED
"status": "completed",
"metadata": tx_metadata,
}
if property_obj:
# property_obj id may be str
tx_data["property_id"] = property_obj.get("id")
return insert_transaction(db, tx_data, session=session)
def upsert_token(db, property_id: str, token_data: Dict[str, Any]) -> Dict[str, Any]:
"""Create or update token record for a property"""
print(f"[REPO] Upserting token record for property {property_id}")
now = _now()
# Use tokens collection
tokens_col = db.tokens
doc = tokens_col.find_one_and_update(
{"property_id": property_id},
{
"$set": {
**token_data,
"property_id": property_id,
"updated_at": now
},
"$setOnInsert": {
"created_at": now
}
},
upsert=True,
return_document=ReturnDocument.AFTER
)
return _clean_object(doc)
def get_property_token(db, property_id: str) -> Optional[Dict[str, Any]]:
"""Get token information for a property"""
tokens_col = db.tokens
doc = tokens_col.find_one({"property_id": property_id})
return _clean_object(doc) if doc else None
# ============================================================================
# STATISTICS
# ============================================================================
def get_admin_stats(db) -> Dict[str, Any]:
"""Get platform statistics for admin dashboard"""
print("[REPO] Calculating admin statistics...")
stats = {
"total_users": users_col(db).count_documents({"deleted": False}),
"total_properties": properties_col(db).count_documents({"deleted": False}),
"total_investments": investments_col(db).count_documents({}),
"active_users": users_col(db).count_documents({"is_active": True, "deleted": False}),
"total_tokens_sold": 0,
"total_volume": 0.0,
"total_revenue": 0.0,
}
# Calculate total tokens sold and volume
pipeline = [
{"$group": {
"_id": None,
"total_tokens": {"$sum": "$tokens_purchased"},
"total_amount": {"$sum": "$amount"}
}}
]
result = list(investments_col(db).aggregate(pipeline))
if result:
stats["total_tokens_sold"] = result[0].get("total_tokens", 0)
stats["total_volume"] = result[0].get("total_amount", 0.0)
stats["total_revenue"] = stats["total_volume"] * 0.02 # 2% platform fee
print(f"[REPO] Stats calculated: {stats}")
return stats
def get_admin_specific_stats(db, admin_user_id: str) -> Dict[str, Any]:
"""Get statistics for a specific admin's properties only"""
print(f"[REPO] Calculating admin-specific statistics for admin: {admin_user_id}")
admin_oid = _coerce_object_id(admin_user_id)
# Get properties created by this admin
admin_properties = list(properties_col(db).find({"created_by": admin_oid, "deleted": False}))
admin_property_ids = [prop["_id"] for prop in admin_properties]
print(f"[REPO] Found {len(admin_property_ids)} properties created by admin")
# Get investments for this admin's properties only
admin_investments = list(investments_col(db).find({"property_id": {"$in": admin_property_ids}}))
# Get unique buyers who invested in this admin's properties
unique_buyers = len(set([inv.get("user_id") for inv in admin_investments if inv.get("user_id")]))
# Calculate totals
total_tokens_sold = sum(inv.get("tokens_purchased", 0) for inv in admin_investments)
total_volume = sum(inv.get("amount", 0) for inv in admin_investments)
total_revenue = total_volume * 0.02 # 2% platform fee
stats = {
"total_users": users_col(db).count_documents({"deleted": False}), # Platform-wide
"total_properties": len(admin_property_ids), # Admin's properties only
"total_investments": len(admin_investments), # Investments in admin's properties
"active_users": unique_buyers, # Buyers who invested in admin's properties
"total_tokens_sold": total_tokens_sold,
"total_volume": total_volume,
"total_revenue": total_revenue,
}
print(f"[REPO] Admin-specific stats calculated: {stats}")
return stats
def get_properties_by_creator(db, creator_id: str) -> List[Dict[str, Any]]:
"""Get all properties created by a specific admin"""
creator_oid = _coerce_object_id(creator_id)
properties = list(properties_col(db).find({"created_by": creator_oid, "deleted": False}))
return [_clean_object(prop) for prop in properties]
# ============================================================================
# RENT DISTRIBUTION OPERATIONS
# ============================================================================
def rent_distributions_col(db) -> Collection:
return db.rent_distributions
def rent_payments_col(db) -> Collection:
return db.rent_payments
def get_investors_by_property(db, property_id: str) -> List[Dict[str, Any]]:
"""Get all investors (users with investments) for a specific property"""
print(f"[REPO] Getting investors for property: {property_id}")
property_oid = _to_object_id(property_id)
# Find all unique users who have invested in this property
pipeline = [
{"$match": {"property_id": property_oid}},
{"$group": {
"_id": "$user_id",
"total_tokens": {"$sum": "$tokens_purchased"},
"total_invested": {"$sum": "$investment_amount"}
}},
{"$lookup": {
"from": "users",
"localField": "_id",
"foreignField": "_id",
"as": "user_info"
}},
{"$unwind": "$user_info"},
{"$project": {
"user_id": "$_id",
"tokens_purchased": "$total_tokens",
"total_invested": "$total_invested",
"wallet_address": "$user_info.wallet_address",
"email": "$user_info.email",
"name": "$user_info.name"
}}
]
investors = list(investments_col(db).aggregate(pipeline))
# Clean the results
result = []
for inv in investors:
result.append({
"user_id": str(inv["user_id"]),
"tokens_purchased": inv.get("tokens_purchased", 0), # Changed from tokens_owned
"total_invested": inv.get("total_invested", 0),
"wallet_address": inv.get("wallet_address"),
"email": inv.get("email"),
"name": inv.get("name")
})
print(f"[REPO] Found {len(result)} investors for property {property_id}")
return result
def create_rent_distribution(db, property_id: str, total_rent_amount: float,
rent_period_start: str, rent_period_end: str,
total_tokens: int, rent_per_token: float,
distribution_date: datetime, notes: Optional[str] = None,
session=None) -> Dict[str, Any]:
"""Create a rent distribution record"""
print(f"[REPO] Creating rent distribution for property: {property_id}, amount: {total_rent_amount}")
now = _now()
doc = {
"property_id": _to_object_id(property_id),
"total_rent_amount": total_rent_amount,
"total_tokens": total_tokens,
"rent_per_token": rent_per_token,
"rent_period_start": rent_period_start,
"rent_period_end": rent_period_end,
"distribution_date": distribution_date,
"total_investors": 0,
"payments_completed": 0,
"status": "pending",
"notes": notes,
"created_at": now,
"updated_at": now,
}
result = rent_distributions_col(db).insert_one(doc, session=session)
doc["_id"] = result.inserted_id
print(f"[REPO] Rent distribution created with ID: {result.inserted_id}")
return _clean_object(doc)
def update_rent_distribution_status(db, distribution_id: str, status: str,
total_investors: int, payments_completed: int,
session=None) -> Dict[str, Any]:
"""Update rent distribution status and counts"""
print(f"[REPO] Updating rent distribution {distribution_id}: status={status}, payments={payments_completed}/{total_investors}")
updated = rent_distributions_col(db).find_one_and_update(
{"_id": _to_object_id(distribution_id)},
{
"$set": {
"status": status,
"total_investors": total_investors,
"payments_completed": payments_completed,
"updated_at": _now(),
}
},
return_document=ReturnDocument.AFTER,
session=session
)
return _clean_object(updated) if updated else None
def create_rent_payment(db, distribution_id: str, user_id: str, property_id: str,
tokens_owned: int, rent_amount: float, rent_period_start: str,
rent_period_end: str, payment_status: str = "pending",
wallet_credited: bool = False, transaction_id: Optional[str] = None,
session=None) -> Dict[str, Any]:
"""Create a rent payment record for a user"""
print(f"[REPO] Creating rent payment for user: {user_id}, amount: {rent_amount}")
now = _now()
doc = {
"distribution_id": _to_object_id(distribution_id),
"user_id": _to_object_id(user_id),
"property_id": _to_object_id(property_id),
"tokens_owned": tokens_owned,
"rent_amount": rent_amount,
"rent_period_start": rent_period_start,
"rent_period_end": rent_period_end,
"payment_status": payment_status,
"wallet_credited": wallet_credited,
"transaction_id": _to_object_id(transaction_id) if transaction_id else None,
"payment_date": now if payment_status == "completed" else None,
"created_at": now,
}
result = rent_payments_col(db).insert_one(doc, session=session)
doc["_id"] = result.inserted_id
print(f"[REPO] Rent payment created with ID: {result.inserted_id}")
return _clean_object(doc)
def get_user_rent_payments(db, user_id: str, property_id: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get all rent payments for a user, optionally filtered by property"""
print(f"[REPO] Getting rent payments for user: {user_id}")
query = {"user_id": _to_object_id(user_id)}
if property_id:
query["property_id"] = _to_object_id(property_id)
payments = list(rent_payments_col(db).find(query).sort("payment_date", DESCENDING))
return [_clean_object(payment) for payment in payments]
def get_rent_distribution_by_id(db, distribution_id: str) -> Optional[Dict[str, Any]]:
"""Get a rent distribution by ID"""
doc = rent_distributions_col(db).find_one({"_id": _to_object_id(distribution_id)})
return _clean_object(doc) if doc else None
def get_user_rent_summary(db, user_id: str, property_id: str) -> Dict[str, Any]:
"""Get rent summary for a user's investment in a property"""
print(f"[REPO] Getting rent summary for user: {user_id}, property: {property_id}")
# Get all rent payments for this user and property
payments = get_user_rent_payments(db, user_id, property_id)
if not payments:
return {
"property_id": property_id,
"total_rent_received": 0.0,
"total_rent_payments": 0,
"last_rent_amount": None,
"last_rent_date": None,
"average_monthly_rent": None,
}
# Calculate aggregates
total_rent = sum(p.get("rent_amount", 0.0) for p in payments if p.get("payment_status") == "completed")
completed_payments = [p for p in payments if p.get("payment_status") == "completed"]
last_payment = completed_payments[0] if completed_payments else None
return {
"property_id": property_id,
"total_rent_received": total_rent,
"total_rent_payments": len(completed_payments),
"last_rent_amount": last_payment.get("rent_amount") if last_payment else None,
"last_rent_date": last_payment.get("payment_date") if last_payment else None,
"average_monthly_rent": total_rent / len(completed_payments) if completed_payments else None,
}
# =====================================================================
# SESSION MANAGEMENT
# =====================================================================
def sessions_col(db):
"""Get sessions collection"""
return db.sessions
def create_session(
db,
session_id: str,
user_id: str,
device_fingerprint: str,
token: str,
expires_at: datetime
) -> Dict[str, Any]:
"""
Create a new authentication session
Args:
db: Database connection
session_id: Unique session identifier
user_id: User's ID
device_fingerprint: Device fingerprint hash
token: JWT token
expires_at: Session expiration timestamp
Returns:
Created session document
"""
print(f"[REPO] Creating session for user: {user_id}")
session_doc = {
"session_id": session_id,
"user_id": _to_object_id(user_id),
"device_fingerprint": device_fingerprint,
"token": token,
"created_at": _now(),
"last_activity": _now(),
"expires_at": expires_at,
"is_active": True
}
sessions_col(db).insert_one(session_doc)
return _clean_object(session_doc)
def get_session_by_id(db, session_id: str) -> Optional[Dict[str, Any]]:
"""Get session by session_id"""
doc = sessions_col(db).find_one({"session_id": session_id, "is_active": True})
return _clean_object(doc) if doc else None
def get_session_by_token(db, token: str) -> Optional[Dict[str, Any]]:
"""Get session by JWT token"""
doc = sessions_col(db).find_one({"token": token, "is_active": True})
return _clean_object(doc) if doc else None
def validate_session(
db,
session_id: str,
device_fingerprint: str
) -> bool:
"""
Validate that a session exists and matches the device fingerprint
Args:
db: Database connection
session_id: Session identifier
device_fingerprint: Current device fingerprint
Returns:
True if session is valid, False otherwise
"""
session = get_session_by_id(db, session_id)
if not session:
print(f"[REPO] Session not found: {session_id}")
return False
# Check if expired
if session.get("expires_at") and session["expires_at"] < _now():
print(f"[REPO] Session expired: {session_id}")
invalidate_session(db, session_id)
return False
# Check device fingerprint match
if session.get("device_fingerprint") != device_fingerprint:
print(f"[REPO] Device fingerprint mismatch for session: {session_id}")
return False
# Update last activity
sessions_col(db).update_one(
{"session_id": session_id},
{"$set": {"last_activity": _now()}}
)
return True
def invalidate_session(db, session_id: str) -> bool:
"""Invalidate a session (logout)"""
print(f"[REPO] Invalidating session: {session_id}")
result = sessions_col(db).update_one(
{"session_id": session_id},
{"$set": {"is_active": False, "invalidated_at": _now()}}
)
return result.modified_count > 0
def invalidate_user_sessions(db, user_id: str) -> int:
"""Invalidate all sessions for a user (logout from all devices)"""
print(f"[REPO] Invalidating all sessions for user: {user_id}")
result = sessions_col(db).update_many(
{"user_id": _to_object_id(user_id), "is_active": True},
{"$set": {"is_active": False, "invalidated_at": _now()}}
)
return result.modified_count
def get_user_active_sessions(db, user_id: str) -> List[Dict[str, Any]]:
"""Get all active sessions for a user"""
sessions = list(sessions_col(db).find({
"user_id": _to_object_id(user_id),
"is_active": True
}).sort("last_activity", DESCENDING))
return [_clean_object(s) for s in sessions]
def cleanup_expired_sessions(db) -> int:
"""Clean up expired sessions (can be run periodically)"""
print("[REPO] Cleaning up expired sessions")
result = sessions_col(db).update_many(
{
"expires_at": {"$lt": _now()},
"is_active": True
},
{"$set": {"is_active": False, "invalidated_at": _now()}}
)
return result.modified_count
# ============================================================================
# SECONDARY MARKET TRANSACTIONS OPERATIONS
# ============================================================================
def secondary_market_col(db) -> Collection:
"""Get secondary_market_transactions collection"""
return db["secondary_market_transactions"]
def create_market_transaction(
db,
transaction_id: str,
transaction_type: str,
property_id: str,
property_title: str,
token_currency: str,
seller_id: str,
seller_email: str,
seller_xrp_address: str,
buyer_id: str,
buyer_email: str,
buyer_xrp_address: str,
tokens_amount: int,
price_per_token: float,
total_amount: float,
blockchain_tx_hash: Optional[str] = None,
notes: Optional[str] = None,
session=None
) -> Dict[str, Any]:
"""
Create a new secondary market transaction record
Args:
db: Database instance
transaction_id: Unique transaction identifier
transaction_type: Type of market transaction (sell_to_admin, etc.)
property_id: Property ID
property_title: Property title
token_currency: Token currency code
seller_id: Seller user ID
seller_email: Seller email
seller_xrp_address: Seller XRP address
buyer_id: Buyer user ID (admin for sell_to_admin)
buyer_email: Buyer email
buyer_xrp_address: Buyer XRP address
tokens_amount: Number of tokens traded
price_per_token: Price per token
total_amount: Total transaction amount
blockchain_tx_hash: Blockchain transaction hash
notes: Additional notes
session: MongoDB session for transactions
Returns:
Created market transaction document
"""
logger.info(f"[REPO] Creating market transaction: {transaction_id}")
market_tx = {
"transaction_id": transaction_id,
"transaction_type": transaction_type,
"status": "completed" if blockchain_tx_hash else "pending",
"property_id": _coerce_object_id(property_id),
"property_title": property_title,
"token_currency": token_currency,
"seller_id": _coerce_object_id(seller_id),
"seller_email": seller_email,
"seller_xrp_address": seller_xrp_address,
"buyer_id": _coerce_object_id(buyer_id),
"buyer_email": buyer_email,
"buyer_xrp_address": buyer_xrp_address,
"tokens_amount": tokens_amount,
"price_per_token": price_per_token,
"total_amount": total_amount,
"currency": "AED",
"blockchain_tx_hash": blockchain_tx_hash,
"blockchain_confirmed": bool(blockchain_tx_hash),
"blockchain_confirmed_at": _now() if blockchain_tx_hash else None,
"db_investment_updated": False,
"db_wallet_updated": False,
"db_property_updated": False,
"db_transaction_recorded": False,
"initiated_at": _now(),
"completed_at": _now() if blockchain_tx_hash else None,
"notes": notes,
"error_message": None
}
result = secondary_market_col(db).insert_one(market_tx, session=session)
market_tx["_id"] = result.inserted_id
logger.info(f"[REPO] Market transaction created: {transaction_id}")
return _clean_object(market_tx)
def update_market_transaction_status(
db,
transaction_id: str,
status: str,
blockchain_tx_hash: Optional[str] = None,
error_message: Optional[str] = None,
db_updates: Optional[Dict[str, bool]] = None,
session=None
) -> Optional[Dict[str, Any]]:
"""
Update market transaction status and details
Args:
db: Database instance
transaction_id: Transaction ID to update
status: New status (pending, completed, failed, etc.)
blockchain_tx_hash: Blockchain transaction hash
error_message: Error message if failed
db_updates: Dictionary of database update flags (investment_updated, wallet_updated, etc.)
session: MongoDB session for transactions
Returns:
Updated market transaction document or None
"""
logger.info(f"[REPO] Updating market transaction {transaction_id} to status: {status}")
update_fields = {
"status": status
}
if blockchain_tx_hash:
update_fields["blockchain_tx_hash"] = blockchain_tx_hash
update_fields["blockchain_confirmed"] = True
update_fields["blockchain_confirmed_at"] = _now()
if error_message:
update_fields["error_message"] = error_message
if status == "completed":
update_fields["completed_at"] = _now()
if db_updates:
if "investment_updated" in db_updates:
update_fields["db_investment_updated"] = db_updates["investment_updated"]
if "wallet_updated" in db_updates:
update_fields["db_wallet_updated"] = db_updates["wallet_updated"]
if "property_updated" in db_updates:
update_fields["db_property_updated"] = db_updates["property_updated"]
if "transaction_recorded" in db_updates:
update_fields["db_transaction_recorded"] = db_updates["transaction_recorded"]
result = secondary_market_col(db).find_one_and_update(
{"transaction_id": transaction_id},
{"$set": update_fields},
return_document=ReturnDocument.AFTER,
session=session
)
if result:
logger.info(f"[REPO] Market transaction {transaction_id} updated successfully")
return _clean_object(result)
logger.warning(f"[REPO] Market transaction {transaction_id} not found")
return None
def get_user_market_history(
db,
user_id: str,
transaction_type: Optional[str] = None,
status: Optional[str] = None,
limit: int = 100
) -> List[Dict[str, Any]]:
"""
Get user's secondary market transaction history
Args:
db: Database instance
user_id: User ID
transaction_type: Filter by transaction type (optional)
status: Filter by status (optional)
limit: Maximum number of records to return
Returns:
List of market transactions
"""
logger.info(f"[REPO] Fetching market history for user: {user_id}")
query = {
"$or": [
{"seller_id": _coerce_object_id(user_id)},
{"buyer_id": _coerce_object_id(user_id)}
]
}
if transaction_type:
query["transaction_type"] = transaction_type
if status:
query["status"] = status
transactions = list(
secondary_market_col(db)
.find(query)
.sort("initiated_at", DESCENDING)
.limit(limit)
)
logger.info(f"[REPO] Found {len(transactions)} market transactions for user {user_id}")
return [_clean_object(tx) for tx in transactions]
def get_property_market_history(
db,
property_id: str,
status: Optional[str] = None,
limit: int = 100
) -> List[Dict[str, Any]]:
"""
Get property's secondary market transaction history
Args:
db: Database instance
property_id: Property ID
status: Filter by status (optional)
limit: Maximum number of records to return
Returns:
List of market transactions for the property
"""
logger.info(f"[REPO] Fetching market history for property: {property_id}")
query = {"property_id": _coerce_object_id(property_id)}
if status:
query["status"] = status
transactions = list(
secondary_market_col(db)
.find(query)
.sort("initiated_at", DESCENDING)
.limit(limit)
)
logger.info(f"[REPO] Found {len(transactions)} market transactions for property {property_id}")
return [_clean_object(tx) for tx in transactions]
def get_market_transaction(db, transaction_id: str) -> Optional[Dict[str, Any]]:
"""
Get a specific market transaction by ID
Args:
db: Database instance
transaction_id: Transaction ID
Returns:
Market transaction document or None
"""
transaction = secondary_market_col(db).find_one({"transaction_id": transaction_id})
if transaction:
return _clean_object(transaction)
return None
# ============================================================================
# CARDS OPERATIONS (Unified for both investor and admin)
# ============================================================================
def cards_col(db) -> Collection:
"""Get cards collection (shared for investor and admin)"""
return db["cards"]
def _detect_card_type(card_number: str) -> str:
"""Detect card type from card number"""
try:
if card_number.startswith('4'):
return 'visa'
elif card_number.startswith(('51', '52', '53', '54', '55')) or (int(card_number[:6]) >= 222100 and int(card_number[:6]) <= 272099):
return 'mastercard'
elif card_number.startswith(('34', '37')):
return 'amex'
elif card_number.startswith('6011') or card_number.startswith('65'):
return 'discover'
except:
pass
return 'unknown'
def create_card(
db,
user_id: str,
user_role: str, # "investor" or "admin"
card_number: str,
expiry_month: int,
expiry_year: int,
cardholder_name: str,
bank_name: Optional[str] = None
) -> Dict[str, Any]:
"""
Create a new card for user/admin (stores only last 4 digits for security)
Args:
db: Database instance
user_id: User's ID
user_role: "investor" or "admin"
card_number: Full card number (will only store last 4)
expiry_month: Expiry month (1-12)
expiry_year: Expiry year (YYYY)
cardholder_name: Name on card
bank_name: Issuing bank name (optional)
Returns:
Created card document (secure - no full card number)
"""
logger.info(f"[REPO] Creating card for {user_role} user: {user_id}")
# Detect card type from full number
card_type = _detect_card_type(card_number)
# Store only last 4 digits
last_four = card_number[-4:]
# Check if card already exists for this user+role
existing = cards_col(db).find_one({
"user_id": _to_object_id(user_id),
"user_role": user_role,
"last_four": last_four,
"expiry_month": expiry_month,
"expiry_year": expiry_year,
"is_active": True
})
if existing:
logger.warning(f"[REPO] Card with last four {last_four} already exists for {user_role} {user_id}")
raise ValueError("Card already exists")
# Check if this should be default (first card for this user+role)
existing_count = cards_col(db).count_documents({
"user_id": _to_object_id(user_id),
"user_role": user_role,
"is_active": True
})
is_default = existing_count == 0
now = _now()
card_doc = {
"user_id": _to_object_id(user_id),
"user_role": user_role,
"card_type": card_type,
"last_four": last_four,
"cardholder_name": cardholder_name,
"bank_name": bank_name if bank_name else None,
"expiry_month": expiry_month,
"expiry_year": expiry_year,
"is_default": is_default,
"is_active": True,
"created_at": now,
"updated_at": now
}
result = cards_col(db).insert_one(card_doc)
card_doc["_id"] = result.inserted_id
logger.info(f"[REPO] Card created with ID: {result.inserted_id}")
return _clean_object(card_doc)
def get_cards(db, user_id: str, user_role: str) -> List[Dict[str, Any]]:
"""Get all active cards for a user/admin"""
logger.info(f"[REPO] Getting cards for {user_role} user: {user_id}")
cards = list(cards_col(db).find({
"user_id": _to_object_id(user_id),
"user_role": user_role,
"is_active": True
}).sort("created_at", DESCENDING))
return [_clean_object(card) for card in cards]
def delete_card(db, user_id: str, user_role: str, card_id: str) -> bool:
"""Soft delete a user's/admin's card"""
logger.info(f"[REPO] Deleting card {card_id} for {user_role} user {user_id}")
result = cards_col(db).update_one(
{
"_id": _to_object_id(card_id),
"user_id": _to_object_id(user_id),
"user_role": user_role
},
{"$set": {"is_active": False, "updated_at": _now()}}
)
if result.modified_count > 0:
# If deleted card was default, set another card as default
deleted_card = cards_col(db).find_one({"_id": _to_object_id(card_id)})
if deleted_card and deleted_card.get("is_default"):
next_card = cards_col(db).find_one({
"user_id": _to_object_id(user_id),
"user_role": user_role,
"is_active": True
})
if next_card:
cards_col(db).update_one(
{"_id": next_card["_id"]},
{"$set": {"is_default": True}}
)
return result.modified_count > 0
def set_default_card(db, user_id: str, user_role: str, card_id: str) -> bool:
"""Set a card as the default payment method"""
logger.info(f"[REPO] Setting card {card_id} as default for {user_role} user {user_id}")
# Unset current default for this user+role
cards_col(db).update_many(
{"user_id": _to_object_id(user_id), "user_role": user_role, "is_default": True},
{"$set": {"is_default": False}}
)
# Set new default
result = cards_col(db).update_one(
{
"_id": _to_object_id(card_id),
"user_id": _to_object_id(user_id),
"user_role": user_role,
"is_active": True
},
{"$set": {"is_default": True}}
)
return result.modified_count > 0
# Legacy aliases for backward compatibility
def create_user_card(db, user_id, card_number, expiry_month, expiry_year, cardholder_name, bank_name=None):
return create_card(db, user_id, "investor", card_number, expiry_month, expiry_year, cardholder_name, bank_name)
def get_user_cards(db, user_id):
return get_cards(db, user_id, "investor")
def delete_user_card(db, user_id, card_id):
return delete_card(db, user_id, "investor", card_id)
# ============================================================================
# BANKS OPERATIONS (Unified for both investor and admin)
# ============================================================================
def banks_col(db) -> Collection:
"""Get banks collection (shared for investor and admin)"""
return db["banks"]
def create_bank(
db,
user_id: str,
user_role: str, # "investor" or "admin"
bank_name: str,
account_holder_name: str,
account_number: str,
account_type: str = "savings",
iban: Optional[str] = None,
swift_code: Optional[str] = None,
currency: str = "AED"
) -> Dict[str, Any]:
"""
Create a new bank account for user/admin (stores only last 4 digits of account number)
Args:
db: Database instance
user_id: User's ID
user_role: "investor" or "admin"
bank_name: Bank name
account_holder_name: Account holder name
account_number: Full account number (will store last 4 digits securely)
account_type: Account type (savings, current, business)
iban: IBAN (optional)
swift_code: SWIFT code (optional)
currency: Currency code (default AED)
Returns:
Created bank document (secure - masked account number)
"""
logger.info(f"[REPO] Creating bank account for {user_role} user: {user_id}")
# Store last 4 digits for display
account_number_last_four = account_number[-4:]
iban_last_four = iban[-4:] if iban else None
# Check if bank already exists for this user+role
existing = banks_col(db).find_one({
"user_id": _to_object_id(user_id),
"user_role": user_role,
"account_number_last_four": account_number_last_four,
"bank_name": bank_name,
"is_active": True
})
if existing:
logger.warning(f"[REPO] Bank account already exists for {user_role} user {user_id}")
raise ValueError("Bank account already exists")
# Check if this should be default (first bank for this user+role)
existing_count = banks_col(db).count_documents({
"user_id": _to_object_id(user_id),
"user_role": user_role,
"is_active": True
})
is_default = existing_count == 0
now = _now()
bank_doc = {
"user_id": _to_object_id(user_id),
"user_role": user_role,
"bank_name": bank_name,
"account_holder_name": account_holder_name,
"account_number_last_four": account_number_last_four,
"account_number_encrypted": account_number, # In production, encrypt this
"iban_last_four": iban_last_four if iban_last_four else None,
"iban_encrypted": iban if iban else None, # In production, encrypt this
"swift_code": swift_code if swift_code else None,
"account_type": account_type,
"currency": currency,
"is_default": is_default,
"is_verified": False, # Can be verified via microdeposits
"is_active": True,
"created_at": now,
"updated_at": now
}
result = banks_col(db).insert_one(bank_doc)
bank_doc["_id"] = result.inserted_id
# Remove encrypted fields from return value
bank_doc.pop("account_number_encrypted", None)
bank_doc.pop("iban_encrypted", None)
# Remove None values for cleaner output
bank_doc = {k: v for k, v in bank_doc.items() if v is not None}
logger.info(f"[REPO] Bank account created with ID: {result.inserted_id}")
return _clean_object(bank_doc)
def get_banks(db, user_id: str, user_role: str) -> List[Dict[str, Any]]:
"""Get all active bank accounts for a user/admin"""
logger.info(f"[REPO] Getting bank accounts for {user_role} user: {user_id}")
banks = list(banks_col(db).find({
"user_id": _to_object_id(user_id),
"user_role": user_role,
"is_active": True
}).sort("created_at", DESCENDING))
# Remove encrypted fields and None values from output
result = []
for bank in banks:
bank_clean = _clean_object(bank)
bank_clean.pop("account_number_encrypted", None)
bank_clean.pop("iban_encrypted", None)
# Remove None values
bank_clean = {k: v for k, v in bank_clean.items() if v is not None}
result.append(bank_clean)
return result
def delete_bank(db, user_id: str, user_role: str, bank_id: str) -> bool:
"""Soft delete a user's/admin's bank account"""
logger.info(f"[REPO] Deleting bank {bank_id} for {user_role} user {user_id}")
result = banks_col(db).update_one(
{
"_id": _to_object_id(bank_id),
"user_id": _to_object_id(user_id),
"user_role": user_role
},
{"$set": {"is_active": False, "updated_at": _now()}}
)
if result.modified_count > 0:
# If deleted bank was default, set another bank as default
deleted_bank = banks_col(db).find_one({"_id": _to_object_id(bank_id)})
if deleted_bank and deleted_bank.get("is_default"):
next_bank = banks_col(db).find_one({
"user_id": _to_object_id(user_id),
"user_role": user_role,
"is_active": True
})
if next_bank:
banks_col(db).update_one(
{"_id": next_bank["_id"]},
{"$set": {"is_default": True}}
)
return result.modified_count > 0
def set_default_bank(db, user_id: str, user_role: str, bank_id: str) -> bool:
"""Set a bank account as the default for withdrawals"""
logger.info(f"[REPO] Setting bank {bank_id} as default for {user_role} user {user_id}")
# Unset current default for this user+role
banks_col(db).update_many(
{"user_id": _to_object_id(user_id), "user_role": user_role, "is_default": True},
{"$set": {"is_default": False}}
)
# Set new default
result = banks_col(db).update_one(
{
"_id": _to_object_id(bank_id),
"user_id": _to_object_id(user_id),
"user_role": user_role,
"is_active": True
},
{"$set": {"is_default": True}}
)
return result.modified_count > 0
# Legacy aliases for backward compatibility
def create_user_bank(db, user_id, bank_name, account_holder_name, account_number, account_type="savings", iban=None, swift_code=None, currency="AED"):
return create_bank(db, user_id, "investor", bank_name, account_holder_name, account_number, account_type, iban, swift_code, currency)
def get_user_banks(db, user_id):
return get_banks(db, user_id, "investor")
def delete_user_bank(db, user_id, bank_id):
return delete_bank(db, user_id, "investor", bank_id)