atriumchain-api / services /rent_distribution_service.py
Jainish1808's picture
Upload folder using huggingface_hub
4e4664a verified
"""
Rent Distribution Service for Real Estate Tokenization Platform
Handles automated rent distribution to token holders
"""
from typing import Dict, Any, List, Optional
from datetime import datetime
import logging
from pymongo.client_session import ClientSession
import repo
logger = logging.getLogger(__name__)
class RentDistributionService:
"""Service to handle rent distribution to token holders"""
def __init__(self, db):
self.db = db
def distribute_rent(
self,
property_id: str,
total_rent_amount: float,
rent_period_start: str,
rent_period_end: str,
distribution_date: Optional[datetime] = None,
notes: Optional[str] = None
) -> Dict[str, Any]:
"""
Distribute rent to all token holders of a property
Steps:
1. Validate property exists
2. Get total tokens for the property
3. Calculate rent per token
4. Create rent distribution record
5. Find all investors (token holders)
6. For each investor:
- Calculate their rent share
- Credit their wallet
- Create transaction record
- Create rent payment record
7. Update distribution status
Args:
property_id: Property ID generating rent
total_rent_amount: Total rent collected (AED)
rent_period_start: Start date (YYYY-MM-DD)
rent_period_end: End date (YYYY-MM-DD)
distribution_date: When to distribute (default: now)
notes: Additional notes
Returns:
Dict containing distribution details and results
"""
logger.info(f"Starting rent distribution for property {property_id}, amount: {total_rent_amount} AED")
if distribution_date is None:
distribution_date = datetime.utcnow()
# Step 1: Validate property exists
property_data = repo.get_property_by_id(self.db, property_id)
if not property_data:
raise ValueError(f"Property not found: {property_id}")
property_title = property_data.get("title", "Unknown Property")
# Step 2: Get total tokens for the property
total_tokens = property_data.get("total_tokens", 0)
if total_tokens <= 0:
raise ValueError(f"Property has no tokens: {property_id}")
# Step 3: Calculate rent per token
rent_per_token = total_rent_amount / total_tokens
logger.info(f"Rent per token: {rent_per_token:.6f} AED ({total_rent_amount} / {total_tokens})")
# Step 4: Create rent distribution record
distribution = repo.create_rent_distribution(
db=self.db,
property_id=property_id,
total_rent_amount=total_rent_amount,
rent_period_start=rent_period_start,
rent_period_end=rent_period_end,
total_tokens=total_tokens,
rent_per_token=rent_per_token,
distribution_date=distribution_date,
notes=notes
)
distribution_id = distribution["id"]
logger.info(f"Created rent distribution: {distribution_id}")
# Step 5: Find all investors for this property
investors = repo.get_investors_by_property(self.db, property_id)
logger.info(f"Found {len(investors)} investors for property {property_id}")
if not investors:
# No investors, mark as completed
repo.update_rent_distribution_status(
db=self.db,
distribution_id=distribution_id,
status="completed",
total_investors=0,
payments_completed=0
)
return {
"distribution_id": distribution_id,
"property_title": property_title,
"total_rent_amount": total_rent_amount,
"total_investors": 0,
"payments_completed": 0,
"payments_failed": 0,
"status": "completed",
"message": "No investors found for this property"
}
# Update status to processing
repo.update_rent_distribution_status(
db=self.db,
distribution_id=distribution_id,
status="processing",
total_investors=len(investors),
payments_completed=0
)
# Step 6: Process each investor WITH TRANSACTION
# Using MongoDB transaction for atomicity - all payments succeed or all fail
payments_completed = 0
payments_failed = 0
payment_results = []
logger.info(f"Starting to process {len(investors)} investors (with transaction)")
# Get MongoDB client for transaction
from db import get_client
client = get_client()
try:
with client.start_session() as session:
with session.start_transaction():
for investment in investors:
try:
user_id = investment.get("user_id")
tokens_owned = investment.get("tokens_purchased", 0)
logger.info(f"Processing investor: user_id={user_id}, tokens_purchased={tokens_owned}")
if tokens_owned <= 0:
logger.warning(f"User {user_id} has 0 tokens, skipping")
payments_failed += 1
payment_results.append({
"user_id": user_id,
"status": "failed",
"error": "No tokens owned"
})
continue
# Calculate rent share
rent_share = tokens_owned * rent_per_token
logger.info(f"User {user_id}: {tokens_owned} tokens × {rent_per_token:.6f} = {rent_share:.2f} AED")
# Credit wallet
wallet = repo.get_wallet_by_user(self.db, user_id)
if not wallet:
logger.error(f"Wallet not found for user {user_id}")
payments_failed += 1
payment_results.append({
"user_id": user_id,
"status": "failed",
"error": "Wallet not found"
})
continue
wallet_id = wallet["id"]
# Update wallet balance (with session for transaction)
updated_wallet = repo.update_wallet_balance(
self.db, wallet_id, rent_share, operation="add", session=session
)
logger.info(f"Credited {rent_share:.2f} AED to wallet {wallet_id}, new balance: {updated_wallet.get('balance') if updated_wallet else 'unknown'}")
# Create transaction record (with session)
transaction = repo.create_transaction(
db=self.db,
user_id=user_id,
wallet_id=wallet_id,
tx_type="profit", # Rent is a profit distribution
amount=rent_share,
property_id=property_id,
status="completed",
metadata={
"type": "rent_distribution",
"distribution_id": distribution_id,
"tokens_owned": tokens_owned,
"rent_per_token": rent_per_token,
"rent_period": f"{rent_period_start} to {rent_period_end}"
},
session=session
)
transaction_id = transaction["id"]
logger.info(f"Created transaction {transaction_id} for rent payment")
# Create rent payment record (with session)
rent_payment = repo.create_rent_payment(
db=self.db,
distribution_id=distribution_id,
user_id=user_id,
property_id=property_id,
tokens_owned=tokens_owned,
rent_amount=rent_share,
rent_period_start=rent_period_start,
rent_period_end=rent_period_end,
payment_status="completed",
wallet_credited=True,
transaction_id=transaction_id,
session=session
)
payments_completed += 1
logger.info(f"Successfully processed rent payment for user {user_id}")
payment_results.append({
"user_id": user_id,
"tokens_owned": tokens_owned,
"rent_amount": rent_share,
"status": "success"
})
except Exception as e:
logger.error(f"Failed to process rent payment for user {user_id}: {e}", exc_info=True)
payments_failed += 1
payment_results.append({
"user_id": user_id if 'user_id' in locals() else "unknown",
"status": "failed",
"error": str(e)
})
# Re-raise to abort transaction if critical error
if payments_completed > 0:
# Some payments succeeded - abort transaction to ensure atomicity
raise
# If we reach here, all payments within transaction succeeded
logger.info(f"Transaction completed: {payments_completed} payments processed")
except Exception as tx_error:
# Transaction was aborted - all changes rolled back
logger.error(f"Transaction aborted, all changes rolled back: {tx_error}")
payments_failed = len(investors)
payments_completed = 0
payment_results = [{"user_id": inv.get("user_id"), "status": "failed", "error": "Transaction aborted"} for inv in investors]
logger.info(f"Investor processing complete: {payments_completed} succeeded, {payments_failed} failed")
# Step 7: Update distribution status to completed
final_status = "completed" if payments_failed == 0 else "completed_with_errors"
repo.update_rent_distribution_status(
db=self.db,
distribution_id=distribution_id,
status=final_status,
total_investors=len(investors),
payments_completed=payments_completed
)
logger.info(f"Rent distribution completed: {payments_completed} succeeded, {payments_failed} failed")
return {
"distribution_id": distribution_id,
"property_id": property_id,
"property_title": property_title,
"total_rent_amount": total_rent_amount,
"rent_per_token": rent_per_token,
"total_investors": len(investors),
"payments_completed": payments_completed,
"payments_failed": payments_failed,
"status": final_status,
"rent_period": f"{rent_period_start} to {rent_period_end}",
"payment_results": payment_results
}
def get_distribution_details(self, distribution_id: str) -> Optional[Dict[str, Any]]:
"""Get details of a rent distribution"""
distribution = repo.get_rent_distribution_by_id(self.db, distribution_id)
if not distribution:
return None
# Enrich with property title
property_id = distribution.get("property_id")
if property_id:
property_data = repo.get_property_by_id(self.db, property_id)
if property_data:
distribution["property_title"] = property_data.get("title")
return distribution