Spaces:
Sleeping
Sleeping
| """ | |
| Production Database Initialization | |
| Sets up MongoDB with proper indexes and collections | |
| """ | |
| from typing import Optional | |
| from pymongo import MongoClient, ASCENDING, DESCENDING, TEXT | |
| from pymongo.errors import CollectionInvalid | |
| from config import settings | |
| from utils.crypto_utils import encrypt_secret, is_encrypted | |
| import base64 | |
| _client: Optional[MongoClient] = None | |
| def get_client() -> MongoClient: | |
| """Get MongoDB client (singleton) with connection pooling""" | |
| global _client | |
| if _client is None: | |
| # Mask credentials in URI for logging | |
| uri = settings.MONGODB_URI | |
| masked = uri | |
| try: | |
| if "@" in uri and "://" in uri: | |
| scheme_sep = uri.split("://", 1) | |
| scheme, rest = scheme_sep[0], scheme_sep[1] | |
| if "@" in rest: | |
| creds, host = rest.split("@", 1) | |
| masked_creds = "***:***" if ":" in creds else "***" | |
| masked = f"{scheme}://{masked_creds}@{host}" | |
| except Exception: | |
| masked = "<redacted>" | |
| print(f"[DB] Connecting to MongoDB: {masked}") | |
| # Connection pool configuration for production workloads | |
| _client = MongoClient( | |
| settings.MONGODB_URI, | |
| maxPoolSize=50, # Maximum connections in pool | |
| minPoolSize=5, # Minimum connections to maintain | |
| maxIdleTimeMS=30000, # Close idle connections after 30s | |
| serverSelectionTimeoutMS=5000, # Fast fail on connection issues | |
| connectTimeoutMS=10000, # Connection timeout | |
| retryWrites=True, # Auto-retry on transient errors | |
| ) | |
| # Test connection | |
| _client.admin.command('ping') | |
| print("[DB] [SUCCESS] MongoDB connection successful (pool: 5-50)") | |
| return _client | |
| def get_db(): | |
| """Get database instance""" | |
| client = get_client() | |
| return client[settings.MONGODB_DB_NAME] | |
| def get_mongo(): | |
| """Generator for FastAPI dependency injection""" | |
| db = get_db() | |
| try: | |
| yield db | |
| finally: | |
| pass # Connection pooling handles cleanup | |
| def get_next_sequence(db, name: str) -> int: | |
| """ | |
| Get the next sequence number for auto-incrementing IDs. | |
| Used for generating unique sequential identifiers. | |
| Args: | |
| db: Database connection | |
| name: Name of the sequence (e.g., 'property_id', 'user_id') | |
| Returns: | |
| Next sequence number | |
| """ | |
| from pymongo import ReturnDocument | |
| doc = db.counters.find_one_and_update( | |
| {"_id": name}, | |
| {"$inc": {"seq": 1}}, | |
| upsert=True, | |
| return_document=ReturnDocument.AFTER, | |
| ) | |
| return int(doc.get("seq", 1)) | |
| def create_collections(db): | |
| """Create all required collections""" | |
| print("[DB] Creating collections...") | |
| collections = [ | |
| 'users', 'wallets', 'properties', 'property_specifications', | |
| 'amenities', 'property_images', 'investments', 'transactions', | |
| 'portfolios', 'documents', 'secondary_market_transactions', | |
| 'otps', 'sessions', 'certificates', 'rent_distributions', | |
| 'rent_payments', 'counters', 'tokens', 'funded_properties' | |
| ] | |
| existing = db.list_collection_names() | |
| for coll_name in collections: | |
| if coll_name not in existing: | |
| try: | |
| db.create_collection(coll_name) | |
| print(f"[DB] [SUCCESS] Created collection: {coll_name}") | |
| except CollectionInvalid: | |
| pass # Collection already exists | |
| def ensure_indexes(db): | |
| """Create all database indexes for performance and uniqueness""" | |
| print("[DB] Creating indexes...") | |
| # ======================================================================== | |
| # USERS COLLECTION | |
| # ======================================================================== | |
| db.users.create_index([("email", ASCENDING)], unique=True, name="idx_email_unique") | |
| db.users.create_index([("role", ASCENDING)], name="idx_role") | |
| db.users.create_index([("is_active", ASCENDING)], name="idx_is_active") | |
| db.users.create_index([("deleted", ASCENDING)], name="idx_deleted") | |
| db.users.create_index([("created_at", DESCENDING)], name="idx_created_at") | |
| print("[DB] [SUCCESS] Users indexes created") | |
| # ======================================================================== | |
| # OTPs COLLECTION (for email verification) | |
| # ======================================================================== | |
| db.otps.create_index([("email", ASCENDING), ("purpose", ASCENDING)], name="idx_email_purpose") | |
| # TTL index: automatically deletes OTPs after they expire | |
| db.otps.create_index([("expires_at", ASCENDING)], name="idx_expires_at_ttl", expireAfterSeconds=0) | |
| db.otps.create_index([("verified", ASCENDING)], name="idx_verified") | |
| db.otps.create_index([("created_at", DESCENDING)], name="idx_created_at") | |
| print("[DB] [SUCCESS] OTPs indexes created (with TTL)") | |
| # ======================================================================== | |
| # WALLETS COLLECTION | |
| # ======================================================================== | |
| db.wallets.create_index([("user_id", ASCENDING)], unique=True, name="idx_user_id_unique") | |
| db.wallets.create_index([("is_active", ASCENDING)], name="idx_is_active") | |
| print("[DB] [SUCCESS] Wallets indexes created") | |
| # ======================================================================== | |
| # PROPERTIES COLLECTION | |
| # ======================================================================== | |
| db.properties.create_index([("title", TEXT)], name="idx_title_text") | |
| db.properties.create_index([("location", ASCENDING)], name="idx_location") | |
| db.properties.create_index([("property_type", ASCENDING)], name="idx_property_type") | |
| db.properties.create_index([("is_active", ASCENDING)], name="idx_is_active") | |
| db.properties.create_index([("deleted", ASCENDING)], name="idx_deleted") | |
| db.properties.create_index([("created_by", ASCENDING)], name="idx_created_by") | |
| db.properties.create_index([("created_at", DESCENDING)], name="idx_created_at") | |
| db.properties.create_index([("funded_date", DESCENDING)], name="idx_funded_date") | |
| db.properties.create_index([("available_tokens", ASCENDING)], name="idx_available_tokens") | |
| print("[DB] [SUCCESS] Properties indexes created") | |
| # ======================================================================== | |
| # PROPERTY SPECIFICATIONS COLLECTION | |
| # ======================================================================== | |
| db.property_specifications.create_index( | |
| [("property_id", ASCENDING)], | |
| unique=True, | |
| name="idx_property_id_unique" | |
| ) | |
| print("[DB] [SUCCESS] Property specifications indexes created") | |
| # ======================================================================== | |
| # AMENITIES COLLECTION | |
| # ======================================================================== | |
| db.amenities.create_index([("property_id", ASCENDING)], name="idx_property_id") | |
| db.amenities.create_index([("is_active", ASCENDING)], name="idx_is_active") | |
| print("[DB] [SUCCESS] Amenities indexes created") | |
| # ======================================================================== | |
| # PROPERTY IMAGES COLLECTION | |
| # ======================================================================== | |
| db.property_images.create_index([("property_id", ASCENDING)], name="idx_property_id") | |
| db.property_images.create_index([("is_main", ASCENDING)], name="idx_is_main") | |
| db.property_images.create_index([("is_active", ASCENDING)], name="idx_is_active") | |
| print("[DB] [SUCCESS] Property images indexes created") | |
| # ======================================================================== | |
| # INVESTMENTS COLLECTION | |
| # ======================================================================== | |
| db.investments.create_index([("user_id", ASCENDING)], name="idx_user_id") | |
| db.investments.create_index([("property_id", ASCENDING)], name="idx_property_id") | |
| db.investments.create_index( | |
| [("user_id", ASCENDING), ("property_id", ASCENDING)], | |
| name="idx_user_property" | |
| ) | |
| db.investments.create_index([("status", ASCENDING)], name="idx_status") | |
| db.investments.create_index([("created_at", DESCENDING)], name="idx_created_at") | |
| print("[DB] [SUCCESS] Investments indexes created") | |
| # ======================================================================== | |
| # TRANSACTIONS COLLECTION | |
| # ======================================================================== | |
| db.transactions.create_index([("user_id", ASCENDING)], name="idx_user_id") | |
| db.transactions.create_index([("wallet_id", ASCENDING)], name="idx_wallet_id") | |
| db.transactions.create_index([("property_id", ASCENDING)], name="idx_property_id") | |
| db.transactions.create_index([("type", ASCENDING)], name="idx_type") | |
| db.transactions.create_index([("status", ASCENDING)], name="idx_status") | |
| db.transactions.create_index([("created_at", DESCENDING)], name="idx_created_at") | |
| db.transactions.create_index( | |
| [("user_id", ASCENDING), ("type", ASCENDING)], | |
| name="idx_user_type" | |
| ) | |
| print("[DB] [SUCCESS] Transactions indexes created") | |
| # ======================================================================== | |
| # PORTFOLIOS COLLECTION | |
| # ======================================================================== | |
| db.portfolios.create_index([("user_id", ASCENDING)], unique=True, name="idx_user_id_unique") | |
| db.portfolios.create_index([("updated_at", DESCENDING)], name="idx_updated_at") | |
| print("[DB] [SUCCESS] Portfolios indexes created") | |
| # ======================================================================== | |
| # DOCUMENTS COLLECTION | |
| # ======================================================================== | |
| db.documents.create_index([("property_id", ASCENDING)], name="idx_property_id") | |
| db.documents.create_index([("file_type", ASCENDING)], name="idx_file_type") | |
| db.documents.create_index([("uploaded_by", ASCENDING)], name="idx_uploaded_by") | |
| print("[DB] [SUCCESS] Documents indexes created") | |
| # ======================================================================== | |
| # SECONDARY MARKET TRANSACTIONS COLLECTION | |
| # ======================================================================== | |
| db.secondary_market_transactions.create_index( | |
| [("transaction_id", ASCENDING)], | |
| unique=True, | |
| name="idx_transaction_id_unique" | |
| ) | |
| db.secondary_market_transactions.create_index( | |
| [("seller_id", ASCENDING)], | |
| name="idx_seller_id" | |
| ) | |
| db.secondary_market_transactions.create_index( | |
| [("buyer_id", ASCENDING)], | |
| name="idx_buyer_id" | |
| ) | |
| db.secondary_market_transactions.create_index( | |
| [("property_id", ASCENDING)], | |
| name="idx_property_id" | |
| ) | |
| db.secondary_market_transactions.create_index( | |
| [("status", ASCENDING)], | |
| name="idx_status" | |
| ) | |
| db.secondary_market_transactions.create_index( | |
| [("transaction_type", ASCENDING)], | |
| name="idx_transaction_type" | |
| ) | |
| db.secondary_market_transactions.create_index( | |
| [("initiated_at", DESCENDING)], | |
| name="idx_initiated_at" | |
| ) | |
| db.secondary_market_transactions.create_index( | |
| [("seller_id", ASCENDING), ("status", ASCENDING)], | |
| name="idx_seller_status" | |
| ) | |
| db.secondary_market_transactions.create_index( | |
| [("property_id", ASCENDING), ("status", ASCENDING)], | |
| name="idx_property_status" | |
| ) | |
| db.secondary_market_transactions.create_index( | |
| [("blockchain_tx_hash", ASCENDING)], | |
| name="idx_blockchain_tx" | |
| ) | |
| print("[DB] [SUCCESS] Secondary market transactions indexes created") | |
| # ======================================================================== | |
| # SESSIONS COLLECTION (Device-bound Authentication) | |
| # ======================================================================== | |
| db.sessions.create_index([("session_id", ASCENDING)], unique=True, name="idx_session_id_unique") | |
| db.sessions.create_index([("user_id", ASCENDING)], name="idx_user_id") | |
| db.sessions.create_index([("expires_at", ASCENDING)], name="idx_expires_at_ttl", expireAfterSeconds=0) | |
| db.sessions.create_index( | |
| [("user_id", ASCENDING), ("device_fingerprint", ASCENDING)], | |
| name="idx_user_device" | |
| ) | |
| db.sessions.create_index([("is_active", ASCENDING)], name="idx_is_active") | |
| print("[DB] [SUCCESS] Sessions indexes created (with TTL)") | |
| # ======================================================================== | |
| # CERTIFICATES COLLECTION (Ownership Certificates) | |
| # ======================================================================== | |
| db.certificates.create_index([("certificate_id", ASCENDING)], unique=True, name="idx_certificate_id_unique") | |
| db.certificates.create_index([("user_id", ASCENDING)], name="idx_user_id") | |
| db.certificates.create_index( | |
| [("user_id", ASCENDING), ("property_details.property_id", ASCENDING)], | |
| name="idx_user_property" | |
| ) | |
| db.certificates.create_index([("issued_date", ASCENDING)], name="idx_issued_date") | |
| db.certificates.create_index([("is_valid", ASCENDING)], name="idx_is_valid") | |
| print("[DB] [SUCCESS] Certificates indexes created") | |
| # ======================================================================== | |
| # RENT DISTRIBUTIONS COLLECTION | |
| # ======================================================================== | |
| db.rent_distributions.create_index([("property_id", ASCENDING)], name="idx_property_id") | |
| db.rent_distributions.create_index([("distribution_date", DESCENDING)], name="idx_distribution_date") | |
| db.rent_distributions.create_index([("status", ASCENDING)], name="idx_status") | |
| db.rent_distributions.create_index([("created_at", DESCENDING)], name="idx_created_at") | |
| db.rent_distributions.create_index( | |
| [("property_id", ASCENDING), ("status", ASCENDING)], | |
| name="idx_property_status" | |
| ) | |
| print("[DB] [SUCCESS] Rent distributions indexes created") | |
| # ======================================================================== | |
| # RENT PAYMENTS COLLECTION | |
| # ======================================================================== | |
| db.rent_payments.create_index([("distribution_id", ASCENDING)], name="idx_distribution_id") | |
| db.rent_payments.create_index([("user_id", ASCENDING)], name="idx_user_id") | |
| db.rent_payments.create_index([("property_id", ASCENDING)], name="idx_property_id") | |
| db.rent_payments.create_index( | |
| [("user_id", ASCENDING), ("property_id", ASCENDING)], | |
| name="idx_user_property" | |
| ) | |
| db.rent_payments.create_index([("payment_date", DESCENDING)], name="idx_payment_date") | |
| db.rent_payments.create_index([("payment_status", ASCENDING)], name="idx_payment_status") | |
| print("[DB] [SUCCESS] Rent payments indexes created") | |
| # ======================================================================== | |
| # TOKENS COLLECTION (Property Tokens) | |
| # ======================================================================== | |
| db.tokens.create_index([("property_id", ASCENDING)], unique=True, name="idx_property_id_unique") | |
| db.tokens.create_index([("is_active", ASCENDING)], name="idx_is_active") | |
| print("[DB] [SUCCESS] Tokens indexes created") | |
| # ======================================================================== | |
| # FUNDED PROPERTIES COLLECTION (Fully Funded Properties Archive) | |
| # ======================================================================== | |
| db.funded_properties.create_index([("property_id", ASCENDING)], unique=True, name="idx_property_id_unique") | |
| db.funded_properties.create_index([("funded_date", DESCENDING)], name="idx_funded_date") | |
| db.funded_properties.create_index([("funding_status", ASCENDING)], name="idx_funding_status") | |
| db.funded_properties.create_index([("is_active", ASCENDING)], name="idx_is_active") | |
| db.funded_properties.create_index([("created_at", DESCENDING)], name="idx_created_at") | |
| print("[DB] [SUCCESS] Funded properties indexes created") | |
| print("[DB] ✅ All indexes created successfully") | |
| def init_mongo(app=None): | |
| """Initialize MongoDB database""" | |
| print("\n" + "="*80) | |
| print("INITIALIZING MONGODB DATABASE") | |
| print("="*80 + "\n") | |
| try: | |
| db = get_db() | |
| # Create collections | |
| create_collections(db) | |
| # Create indexes | |
| ensure_indexes(db) | |
| # Optional: migrate legacy wallet seeds to encrypted form if encryption enabled | |
| try: | |
| if settings.encryption_enabled(): | |
| wallets = db.wallets.find({"xrp_seed": {"$exists": True, "$ne": None}}) | |
| migrated = 0 | |
| for w in wallets: | |
| seed_val = w.get("xrp_seed") | |
| if not seed_val or is_encrypted(seed_val): | |
| continue | |
| # Detect base64 vs plain: attempt decode | |
| try: | |
| decoded = base64.b64decode(seed_val).decode() | |
| candidate = decoded if decoded.startswith("s") else seed_val | |
| except Exception: | |
| candidate = seed_val | |
| new_enc = encrypt_secret(candidate, settings.ENCRYPTION_KEY) | |
| db.wallets.update_one({"_id": w["_id"]}, {"$set": {"xrp_seed": new_enc}}) | |
| migrated += 1 | |
| if migrated: | |
| print(f"[DB] [SUCCESS] Migrated {migrated} wallet seeds to encrypted format") | |
| except Exception as mig_err: | |
| print(f"[DB][WARNING] Seed encryption migration skipped: {mig_err}") | |
| # Attach to FastAPI app if provided | |
| if app: | |
| app.state.mongo = db | |
| print("[DB] [SUCCESS] Database attached to FastAPI app") | |
| print("\n[DB] ✅ Database initialization complete!") | |
| print("="*80 + "\n") | |
| return db | |
| except Exception as e: | |
| print(f"\n[DB] [ERROR] ERROR during initialization: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| raise | |
| def close_mongo(): | |
| """Close MongoDB connection""" | |
| global _client | |
| if _client: | |
| _client.close() | |
| _client = None | |
| print("[DB] Connection closed") | |
| # Optional: Database health check | |
| def check_db_health(): | |
| """Check database connectivity and health""" | |
| try: | |
| client = get_client() | |
| # Ping the database | |
| client.admin.command('ping') | |
| # Check database exists | |
| db = get_db() | |
| collections = db.list_collection_names() | |
| return { | |
| "status": "healthy", | |
| "database": settings.MONGODB_DB_NAME, | |
| "collections": len(collections), | |
| "collection_names": collections | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "unhealthy", | |
| "error": str(e) | |
| } | |