File size: 19,172 Bytes
4e4664a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
"""
Production Database Initialization
Sets up MongoDB with proper indexes and collections
"""
from typing import Optional
from pymongo import MongoClient, ASCENDING, DESCENDING, TEXT
from pymongo.errors import CollectionInvalid
from config import settings
from utils.crypto_utils import encrypt_secret, is_encrypted
import base64


_client: Optional[MongoClient] = None


def get_client() -> MongoClient:
    """Get MongoDB client (singleton) with connection pooling"""
    global _client
    if _client is None:
        # Mask credentials in URI for logging
        uri = settings.MONGODB_URI
        masked = uri
        try:
            if "@" in uri and "://" in uri:
                scheme_sep = uri.split("://", 1)
                scheme, rest = scheme_sep[0], scheme_sep[1]
                if "@" in rest:
                    creds, host = rest.split("@", 1)
                    masked_creds = "***:***" if ":" in creds else "***"
                    masked = f"{scheme}://{masked_creds}@{host}"
        except Exception:
            masked = "<redacted>"
        print(f"[DB] Connecting to MongoDB: {masked}")
        
        # Connection pool configuration for production workloads
        _client = MongoClient(
            settings.MONGODB_URI,
            maxPoolSize=50,           # Maximum connections in pool
            minPoolSize=5,            # Minimum connections to maintain
            maxIdleTimeMS=30000,      # Close idle connections after 30s
            serverSelectionTimeoutMS=5000,  # Fast fail on connection issues
            connectTimeoutMS=10000,   # Connection timeout
            retryWrites=True,         # Auto-retry on transient errors
        )
        # Test connection
        _client.admin.command('ping')
        print("[DB] [SUCCESS] MongoDB connection successful (pool: 5-50)")
    return _client


def get_db():
    """Get database instance"""
    client = get_client()
    return client[settings.MONGODB_DB_NAME]


def get_mongo():
    """Generator for FastAPI dependency injection"""
    db = get_db()
    try:
        yield db
    finally:
        pass  # Connection pooling handles cleanup


def get_next_sequence(db, name: str) -> int:
    """
    Get the next sequence number for auto-incrementing IDs.
    Used for generating unique sequential identifiers.
    
    Args:
        db: Database connection
        name: Name of the sequence (e.g., 'property_id', 'user_id')
    
    Returns:
        Next sequence number
    """
    from pymongo import ReturnDocument
    doc = db.counters.find_one_and_update(
        {"_id": name},
        {"$inc": {"seq": 1}},
        upsert=True,
        return_document=ReturnDocument.AFTER,
    )
    return int(doc.get("seq", 1))


def create_collections(db):
    """Create all required collections"""
    print("[DB] Creating collections...")
    
    collections = [
        'users', 'wallets', 'properties', 'property_specifications',
        'amenities', 'property_images', 'investments', 'transactions',
        'portfolios', 'documents', 'secondary_market_transactions',
        'otps', 'sessions', 'certificates', 'rent_distributions', 
        'rent_payments', 'counters', 'tokens', 'funded_properties'
    ]
    
    existing = db.list_collection_names()
    
    for coll_name in collections:
        if coll_name not in existing:
            try:
                db.create_collection(coll_name)
                print(f"[DB] [SUCCESS] Created collection: {coll_name}")
            except CollectionInvalid:
                pass  # Collection already exists


def ensure_indexes(db):
    """Create all database indexes for performance and uniqueness"""
    print("[DB] Creating indexes...")
    
    # ========================================================================
    # USERS COLLECTION
    # ========================================================================
    db.users.create_index([("email", ASCENDING)], unique=True, name="idx_email_unique")
    db.users.create_index([("role", ASCENDING)], name="idx_role")
    db.users.create_index([("is_active", ASCENDING)], name="idx_is_active")
    db.users.create_index([("deleted", ASCENDING)], name="idx_deleted")
    db.users.create_index([("created_at", DESCENDING)], name="idx_created_at")
    print("[DB] [SUCCESS] Users indexes created")
    
    # ========================================================================
    # OTPs COLLECTION (for email verification)
    # ========================================================================
    db.otps.create_index([("email", ASCENDING), ("purpose", ASCENDING)], name="idx_email_purpose")
    # TTL index: automatically deletes OTPs after they expire
    db.otps.create_index([("expires_at", ASCENDING)], name="idx_expires_at_ttl", expireAfterSeconds=0)
    db.otps.create_index([("verified", ASCENDING)], name="idx_verified")
    db.otps.create_index([("created_at", DESCENDING)], name="idx_created_at")
    print("[DB] [SUCCESS] OTPs indexes created (with TTL)")
    
    # ========================================================================
    # WALLETS COLLECTION
    # ========================================================================
    db.wallets.create_index([("user_id", ASCENDING)], unique=True, name="idx_user_id_unique")
    db.wallets.create_index([("is_active", ASCENDING)], name="idx_is_active")
    print("[DB] [SUCCESS] Wallets indexes created")
    
    # ========================================================================
    # PROPERTIES COLLECTION
    # ========================================================================
    db.properties.create_index([("title", TEXT)], name="idx_title_text")
    db.properties.create_index([("location", ASCENDING)], name="idx_location")
    db.properties.create_index([("property_type", ASCENDING)], name="idx_property_type")
    db.properties.create_index([("is_active", ASCENDING)], name="idx_is_active")
    db.properties.create_index([("deleted", ASCENDING)], name="idx_deleted")
    db.properties.create_index([("created_by", ASCENDING)], name="idx_created_by")
    db.properties.create_index([("created_at", DESCENDING)], name="idx_created_at")
    db.properties.create_index([("funded_date", DESCENDING)], name="idx_funded_date")
    db.properties.create_index([("available_tokens", ASCENDING)], name="idx_available_tokens")
    print("[DB] [SUCCESS] Properties indexes created")
    
    # ========================================================================
    # PROPERTY SPECIFICATIONS COLLECTION
    # ========================================================================
    db.property_specifications.create_index(
        [("property_id", ASCENDING)], 
        unique=True, 
        name="idx_property_id_unique"
    )
    print("[DB] [SUCCESS] Property specifications indexes created")
    
    # ========================================================================
    # AMENITIES COLLECTION
    # ========================================================================
    db.amenities.create_index([("property_id", ASCENDING)], name="idx_property_id")
    db.amenities.create_index([("is_active", ASCENDING)], name="idx_is_active")
    print("[DB] [SUCCESS] Amenities indexes created")
    
    # ========================================================================
    # PROPERTY IMAGES COLLECTION
    # ========================================================================
    db.property_images.create_index([("property_id", ASCENDING)], name="idx_property_id")
    db.property_images.create_index([("is_main", ASCENDING)], name="idx_is_main")
    db.property_images.create_index([("is_active", ASCENDING)], name="idx_is_active")
    print("[DB] [SUCCESS] Property images indexes created")
    
    # ========================================================================
    # INVESTMENTS COLLECTION
    # ========================================================================
    db.investments.create_index([("user_id", ASCENDING)], name="idx_user_id")
    db.investments.create_index([("property_id", ASCENDING)], name="idx_property_id")
    db.investments.create_index(
        [("user_id", ASCENDING), ("property_id", ASCENDING)],
        name="idx_user_property"
    )
    db.investments.create_index([("status", ASCENDING)], name="idx_status")
    db.investments.create_index([("created_at", DESCENDING)], name="idx_created_at")
    print("[DB] [SUCCESS] Investments indexes created")
    
    # ========================================================================
    # TRANSACTIONS COLLECTION
    # ========================================================================
    db.transactions.create_index([("user_id", ASCENDING)], name="idx_user_id")
    db.transactions.create_index([("wallet_id", ASCENDING)], name="idx_wallet_id")
    db.transactions.create_index([("property_id", ASCENDING)], name="idx_property_id")
    db.transactions.create_index([("type", ASCENDING)], name="idx_type")
    db.transactions.create_index([("status", ASCENDING)], name="idx_status")
    db.transactions.create_index([("created_at", DESCENDING)], name="idx_created_at")
    db.transactions.create_index(
        [("user_id", ASCENDING), ("type", ASCENDING)],
        name="idx_user_type"
    )
    print("[DB] [SUCCESS] Transactions indexes created")
    
    # ========================================================================
    # PORTFOLIOS COLLECTION
    # ========================================================================
    db.portfolios.create_index([("user_id", ASCENDING)], unique=True, name="idx_user_id_unique")
    db.portfolios.create_index([("updated_at", DESCENDING)], name="idx_updated_at")
    print("[DB] [SUCCESS] Portfolios indexes created")
    
    # ========================================================================
    # DOCUMENTS COLLECTION
    # ========================================================================
    db.documents.create_index([("property_id", ASCENDING)], name="idx_property_id")
    db.documents.create_index([("file_type", ASCENDING)], name="idx_file_type")
    db.documents.create_index([("uploaded_by", ASCENDING)], name="idx_uploaded_by")
    print("[DB] [SUCCESS] Documents indexes created")
    
    # ========================================================================
    # SECONDARY MARKET TRANSACTIONS COLLECTION
    # ========================================================================
    db.secondary_market_transactions.create_index(
        [("transaction_id", ASCENDING)], 
        unique=True, 
        name="idx_transaction_id_unique"
    )
    db.secondary_market_transactions.create_index(
        [("seller_id", ASCENDING)], 
        name="idx_seller_id"
    )
    db.secondary_market_transactions.create_index(
        [("buyer_id", ASCENDING)], 
        name="idx_buyer_id"
    )
    db.secondary_market_transactions.create_index(
        [("property_id", ASCENDING)], 
        name="idx_property_id"
    )
    db.secondary_market_transactions.create_index(
        [("status", ASCENDING)], 
        name="idx_status"
    )
    db.secondary_market_transactions.create_index(
        [("transaction_type", ASCENDING)], 
        name="idx_transaction_type"
    )
    db.secondary_market_transactions.create_index(
        [("initiated_at", DESCENDING)], 
        name="idx_initiated_at"
    )
    db.secondary_market_transactions.create_index(
        [("seller_id", ASCENDING), ("status", ASCENDING)],
        name="idx_seller_status"
    )
    db.secondary_market_transactions.create_index(
        [("property_id", ASCENDING), ("status", ASCENDING)],
        name="idx_property_status"
    )
    db.secondary_market_transactions.create_index(
        [("blockchain_tx_hash", ASCENDING)], 
        name="idx_blockchain_tx"
    )
    print("[DB] [SUCCESS] Secondary market transactions indexes created")
    
    # ========================================================================
    # SESSIONS COLLECTION (Device-bound Authentication)
    # ========================================================================
    db.sessions.create_index([("session_id", ASCENDING)], unique=True, name="idx_session_id_unique")
    db.sessions.create_index([("user_id", ASCENDING)], name="idx_user_id")
    db.sessions.create_index([("expires_at", ASCENDING)], name="idx_expires_at_ttl", expireAfterSeconds=0)
    db.sessions.create_index(
        [("user_id", ASCENDING), ("device_fingerprint", ASCENDING)],
        name="idx_user_device"
    )
    db.sessions.create_index([("is_active", ASCENDING)], name="idx_is_active")
    print("[DB] [SUCCESS] Sessions indexes created (with TTL)")
    
    # ========================================================================
    # CERTIFICATES COLLECTION (Ownership Certificates)
    # ========================================================================
    db.certificates.create_index([("certificate_id", ASCENDING)], unique=True, name="idx_certificate_id_unique")
    db.certificates.create_index([("user_id", ASCENDING)], name="idx_user_id")
    db.certificates.create_index(
        [("user_id", ASCENDING), ("property_details.property_id", ASCENDING)],
        name="idx_user_property"
    )
    db.certificates.create_index([("issued_date", ASCENDING)], name="idx_issued_date")
    db.certificates.create_index([("is_valid", ASCENDING)], name="idx_is_valid")
    print("[DB] [SUCCESS] Certificates indexes created")
    
    # ========================================================================
    # RENT DISTRIBUTIONS COLLECTION
    # ========================================================================
    db.rent_distributions.create_index([("property_id", ASCENDING)], name="idx_property_id")
    db.rent_distributions.create_index([("distribution_date", DESCENDING)], name="idx_distribution_date")
    db.rent_distributions.create_index([("status", ASCENDING)], name="idx_status")
    db.rent_distributions.create_index([("created_at", DESCENDING)], name="idx_created_at")
    db.rent_distributions.create_index(
        [("property_id", ASCENDING), ("status", ASCENDING)],
        name="idx_property_status"
    )
    print("[DB] [SUCCESS] Rent distributions indexes created")
    
    # ========================================================================
    # RENT PAYMENTS COLLECTION
    # ========================================================================
    db.rent_payments.create_index([("distribution_id", ASCENDING)], name="idx_distribution_id")
    db.rent_payments.create_index([("user_id", ASCENDING)], name="idx_user_id")
    db.rent_payments.create_index([("property_id", ASCENDING)], name="idx_property_id")
    db.rent_payments.create_index(
        [("user_id", ASCENDING), ("property_id", ASCENDING)],
        name="idx_user_property"
    )
    db.rent_payments.create_index([("payment_date", DESCENDING)], name="idx_payment_date")
    db.rent_payments.create_index([("payment_status", ASCENDING)], name="idx_payment_status")
    print("[DB] [SUCCESS] Rent payments indexes created")
    
    # ========================================================================
    # TOKENS COLLECTION (Property Tokens)
    # ========================================================================
    db.tokens.create_index([("property_id", ASCENDING)], unique=True, name="idx_property_id_unique")
    db.tokens.create_index([("is_active", ASCENDING)], name="idx_is_active")
    print("[DB] [SUCCESS] Tokens indexes created")
    
    # ========================================================================
    # FUNDED PROPERTIES COLLECTION (Fully Funded Properties Archive)
    # ========================================================================
    db.funded_properties.create_index([("property_id", ASCENDING)], unique=True, name="idx_property_id_unique")
    db.funded_properties.create_index([("funded_date", DESCENDING)], name="idx_funded_date")
    db.funded_properties.create_index([("funding_status", ASCENDING)], name="idx_funding_status")
    db.funded_properties.create_index([("is_active", ASCENDING)], name="idx_is_active")
    db.funded_properties.create_index([("created_at", DESCENDING)], name="idx_created_at")
    print("[DB] [SUCCESS] Funded properties indexes created")
    
    print("[DB] ✅ All indexes created successfully")


def init_mongo(app=None):
    """Initialize MongoDB database"""
    print("\n" + "="*80)
    print("INITIALIZING MONGODB DATABASE")
    print("="*80 + "\n")
    
    try:
        db = get_db()
        
        # Create collections
        create_collections(db)
        
        # Create indexes
        ensure_indexes(db)

        # Optional: migrate legacy wallet seeds to encrypted form if encryption enabled
        try:
            if settings.encryption_enabled():
                wallets = db.wallets.find({"xrp_seed": {"$exists": True, "$ne": None}})
                migrated = 0
                for w in wallets:
                    seed_val = w.get("xrp_seed")
                    if not seed_val or is_encrypted(seed_val):
                        continue
                    # Detect base64 vs plain: attempt decode
                    try:
                        decoded = base64.b64decode(seed_val).decode()
                        candidate = decoded if decoded.startswith("s") else seed_val
                    except Exception:
                        candidate = seed_val
                    new_enc = encrypt_secret(candidate, settings.ENCRYPTION_KEY)
                    db.wallets.update_one({"_id": w["_id"]}, {"$set": {"xrp_seed": new_enc}})
                    migrated += 1
                if migrated:
                    print(f"[DB] [SUCCESS] Migrated {migrated} wallet seeds to encrypted format")
        except Exception as mig_err:
            print(f"[DB][WARNING] Seed encryption migration skipped: {mig_err}")
        
        # Attach to FastAPI app if provided
        if app:
            app.state.mongo = db
            print("[DB] [SUCCESS] Database attached to FastAPI app")
        
        print("\n[DB] ✅ Database initialization complete!")
        print("="*80 + "\n")
        
        return db
        
    except Exception as e:
        print(f"\n[DB] [ERROR] ERROR during initialization: {str(e)}")
        import traceback
        traceback.print_exc()
        raise


def close_mongo():
    """Close MongoDB connection"""
    global _client
    if _client:
        _client.close()
        _client = None
        print("[DB] Connection closed")


# Optional: Database health check
def check_db_health():
    """Check database connectivity and health"""
    try:
        client = get_client()
        # Ping the database
        client.admin.command('ping')
        
        # Check database exists
        db = get_db()
        collections = db.list_collection_names()
        
        return {
            "status": "healthy",
            "database": settings.MONGODB_DB_NAME,
            "collections": len(collections),
            "collection_names": collections
        }
    except Exception as e:
        return {
            "status": "unhealthy",
            "error": str(e)
        }