Spaces:
Sleeping
Sleeping
| """ | |
| Rent Distribution Service for Real Estate Tokenization Platform | |
| Handles automated rent distribution to token holders | |
| """ | |
| from typing import Dict, Any, List, Optional | |
| from datetime import datetime | |
| import logging | |
| from pymongo.client_session import ClientSession | |
| import repo | |
| logger = logging.getLogger(__name__) | |
| class RentDistributionService: | |
| """Service to handle rent distribution to token holders""" | |
| def __init__(self, db): | |
| self.db = db | |
| def distribute_rent( | |
| self, | |
| property_id: str, | |
| total_rent_amount: float, | |
| rent_period_start: str, | |
| rent_period_end: str, | |
| distribution_date: Optional[datetime] = None, | |
| notes: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Distribute rent to all token holders of a property | |
| Steps: | |
| 1. Validate property exists | |
| 2. Get total tokens for the property | |
| 3. Calculate rent per token | |
| 4. Create rent distribution record | |
| 5. Find all investors (token holders) | |
| 6. For each investor: | |
| - Calculate their rent share | |
| - Credit their wallet | |
| - Create transaction record | |
| - Create rent payment record | |
| 7. Update distribution status | |
| Args: | |
| property_id: Property ID generating rent | |
| total_rent_amount: Total rent collected (AED) | |
| rent_period_start: Start date (YYYY-MM-DD) | |
| rent_period_end: End date (YYYY-MM-DD) | |
| distribution_date: When to distribute (default: now) | |
| notes: Additional notes | |
| Returns: | |
| Dict containing distribution details and results | |
| """ | |
| logger.info(f"Starting rent distribution for property {property_id}, amount: {total_rent_amount} AED") | |
| if distribution_date is None: | |
| distribution_date = datetime.utcnow() | |
| # Step 1: Validate property exists | |
| property_data = repo.get_property_by_id(self.db, property_id) | |
| if not property_data: | |
| raise ValueError(f"Property not found: {property_id}") | |
| property_title = property_data.get("title", "Unknown Property") | |
| # Step 2: Get total tokens for the property | |
| total_tokens = property_data.get("total_tokens", 0) | |
| if total_tokens <= 0: | |
| raise ValueError(f"Property has no tokens: {property_id}") | |
| # Step 3: Calculate rent per token | |
| rent_per_token = total_rent_amount / total_tokens | |
| logger.info(f"Rent per token: {rent_per_token:.6f} AED ({total_rent_amount} / {total_tokens})") | |
| # Step 4: Create rent distribution record | |
| distribution = repo.create_rent_distribution( | |
| db=self.db, | |
| property_id=property_id, | |
| total_rent_amount=total_rent_amount, | |
| rent_period_start=rent_period_start, | |
| rent_period_end=rent_period_end, | |
| total_tokens=total_tokens, | |
| rent_per_token=rent_per_token, | |
| distribution_date=distribution_date, | |
| notes=notes | |
| ) | |
| distribution_id = distribution["id"] | |
| logger.info(f"Created rent distribution: {distribution_id}") | |
| # Step 5: Find all investors for this property | |
| investors = repo.get_investors_by_property(self.db, property_id) | |
| logger.info(f"Found {len(investors)} investors for property {property_id}") | |
| if not investors: | |
| # No investors, mark as completed | |
| repo.update_rent_distribution_status( | |
| db=self.db, | |
| distribution_id=distribution_id, | |
| status="completed", | |
| total_investors=0, | |
| payments_completed=0 | |
| ) | |
| return { | |
| "distribution_id": distribution_id, | |
| "property_title": property_title, | |
| "total_rent_amount": total_rent_amount, | |
| "total_investors": 0, | |
| "payments_completed": 0, | |
| "payments_failed": 0, | |
| "status": "completed", | |
| "message": "No investors found for this property" | |
| } | |
| # Update status to processing | |
| repo.update_rent_distribution_status( | |
| db=self.db, | |
| distribution_id=distribution_id, | |
| status="processing", | |
| total_investors=len(investors), | |
| payments_completed=0 | |
| ) | |
| # Step 6: Process each investor WITH TRANSACTION | |
| # Using MongoDB transaction for atomicity - all payments succeed or all fail | |
| payments_completed = 0 | |
| payments_failed = 0 | |
| payment_results = [] | |
| logger.info(f"Starting to process {len(investors)} investors (with transaction)") | |
| # Get MongoDB client for transaction | |
| from db import get_client | |
| client = get_client() | |
| try: | |
| with client.start_session() as session: | |
| with session.start_transaction(): | |
| for investment in investors: | |
| try: | |
| user_id = investment.get("user_id") | |
| tokens_owned = investment.get("tokens_purchased", 0) | |
| logger.info(f"Processing investor: user_id={user_id}, tokens_purchased={tokens_owned}") | |
| if tokens_owned <= 0: | |
| logger.warning(f"User {user_id} has 0 tokens, skipping") | |
| payments_failed += 1 | |
| payment_results.append({ | |
| "user_id": user_id, | |
| "status": "failed", | |
| "error": "No tokens owned" | |
| }) | |
| continue | |
| # Calculate rent share | |
| rent_share = tokens_owned * rent_per_token | |
| logger.info(f"User {user_id}: {tokens_owned} tokens × {rent_per_token:.6f} = {rent_share:.2f} AED") | |
| # Credit wallet | |
| wallet = repo.get_wallet_by_user(self.db, user_id) | |
| if not wallet: | |
| logger.error(f"Wallet not found for user {user_id}") | |
| payments_failed += 1 | |
| payment_results.append({ | |
| "user_id": user_id, | |
| "status": "failed", | |
| "error": "Wallet not found" | |
| }) | |
| continue | |
| wallet_id = wallet["id"] | |
| # Update wallet balance (with session for transaction) | |
| updated_wallet = repo.update_wallet_balance( | |
| self.db, wallet_id, rent_share, operation="add", session=session | |
| ) | |
| logger.info(f"Credited {rent_share:.2f} AED to wallet {wallet_id}, new balance: {updated_wallet.get('balance') if updated_wallet else 'unknown'}") | |
| # Create transaction record (with session) | |
| transaction = repo.create_transaction( | |
| db=self.db, | |
| user_id=user_id, | |
| wallet_id=wallet_id, | |
| tx_type="profit", # Rent is a profit distribution | |
| amount=rent_share, | |
| property_id=property_id, | |
| status="completed", | |
| metadata={ | |
| "type": "rent_distribution", | |
| "distribution_id": distribution_id, | |
| "tokens_owned": tokens_owned, | |
| "rent_per_token": rent_per_token, | |
| "rent_period": f"{rent_period_start} to {rent_period_end}" | |
| }, | |
| session=session | |
| ) | |
| transaction_id = transaction["id"] | |
| logger.info(f"Created transaction {transaction_id} for rent payment") | |
| # Create rent payment record (with session) | |
| rent_payment = repo.create_rent_payment( | |
| db=self.db, | |
| distribution_id=distribution_id, | |
| user_id=user_id, | |
| property_id=property_id, | |
| tokens_owned=tokens_owned, | |
| rent_amount=rent_share, | |
| rent_period_start=rent_period_start, | |
| rent_period_end=rent_period_end, | |
| payment_status="completed", | |
| wallet_credited=True, | |
| transaction_id=transaction_id, | |
| session=session | |
| ) | |
| payments_completed += 1 | |
| logger.info(f"Successfully processed rent payment for user {user_id}") | |
| payment_results.append({ | |
| "user_id": user_id, | |
| "tokens_owned": tokens_owned, | |
| "rent_amount": rent_share, | |
| "status": "success" | |
| }) | |
| except Exception as e: | |
| logger.error(f"Failed to process rent payment for user {user_id}: {e}", exc_info=True) | |
| payments_failed += 1 | |
| payment_results.append({ | |
| "user_id": user_id if 'user_id' in locals() else "unknown", | |
| "status": "failed", | |
| "error": str(e) | |
| }) | |
| # Re-raise to abort transaction if critical error | |
| if payments_completed > 0: | |
| # Some payments succeeded - abort transaction to ensure atomicity | |
| raise | |
| # If we reach here, all payments within transaction succeeded | |
| logger.info(f"Transaction completed: {payments_completed} payments processed") | |
| except Exception as tx_error: | |
| # Transaction was aborted - all changes rolled back | |
| logger.error(f"Transaction aborted, all changes rolled back: {tx_error}") | |
| payments_failed = len(investors) | |
| payments_completed = 0 | |
| payment_results = [{"user_id": inv.get("user_id"), "status": "failed", "error": "Transaction aborted"} for inv in investors] | |
| logger.info(f"Investor processing complete: {payments_completed} succeeded, {payments_failed} failed") | |
| # Step 7: Update distribution status to completed | |
| final_status = "completed" if payments_failed == 0 else "completed_with_errors" | |
| repo.update_rent_distribution_status( | |
| db=self.db, | |
| distribution_id=distribution_id, | |
| status=final_status, | |
| total_investors=len(investors), | |
| payments_completed=payments_completed | |
| ) | |
| logger.info(f"Rent distribution completed: {payments_completed} succeeded, {payments_failed} failed") | |
| return { | |
| "distribution_id": distribution_id, | |
| "property_id": property_id, | |
| "property_title": property_title, | |
| "total_rent_amount": total_rent_amount, | |
| "rent_per_token": rent_per_token, | |
| "total_investors": len(investors), | |
| "payments_completed": payments_completed, | |
| "payments_failed": payments_failed, | |
| "status": final_status, | |
| "rent_period": f"{rent_period_start} to {rent_period_end}", | |
| "payment_results": payment_results | |
| } | |
| def get_distribution_details(self, distribution_id: str) -> Optional[Dict[str, Any]]: | |
| """Get details of a rent distribution""" | |
| distribution = repo.get_rent_distribution_by_id(self.db, distribution_id) | |
| if not distribution: | |
| return None | |
| # Enrich with property title | |
| property_id = distribution.get("property_id") | |
| if property_id: | |
| property_data = repo.get_property_by_id(self.db, property_id) | |
| if property_data: | |
| distribution["property_title"] = property_data.get("title") | |
| return distribution | |