# ============================================================ # app/database.py - MongoDB Connection Management (FIXED) # ============================================================ from motor.motor_asyncio import AsyncIOMotorClient as AsyncClient from motor.motor_asyncio import AsyncIOMotorDatabase as AsyncDatabase import logging from app.config import settings logger = logging.getLogger(__name__) class DatabaseConnection: """Singleton for MongoDB async connection""" client: AsyncClient = None database: AsyncDatabase = None db = DatabaseConnection() async def connect_db(): """Connect to MongoDB""" try: db.client = AsyncClient( settings.MONGODB_URL, maxPoolSize=100, # Max connections (Scale: ~300 per replica member) minPoolSize=10, # Keep warm connections ready maxIdleTimeMS=45000, # Close idle connections after 45s waitQueueTimeoutMS=10000, # Fail fast if pool is exhausted (10s) serverSelectionTimeoutMS=5000, # Fail fast if DB is down (5s) ) db.database = db.client[settings.MONGODB_DATABASE] # Test connection await db.client.admin.command("ping") logger.info(f"Connected to MongoDB: {settings.MONGODB_DATABASE}") except Exception as e: logger.error(f"Failed to connect to MongoDB: {str(e)}") raise async def disconnect_db(): """Disconnect from MongoDB""" if db.client is not None: # FIX: Use 'is not None' instead of bool check db.client.close() logger.info("Disconnected from MongoDB") async def get_db() -> AsyncDatabase: """Get database instance - NOW ASYNC""" # FIX 1: Compare with None instead of using bool() # FIX 2: Make this async since it was being called in async context if db.database is None: raise RuntimeError("Database not initialized. Call connect_db() first.") return db.database def get_db_sync() -> AsyncDatabase: """Get database instance synchronously (use with caution)""" if db.database is None: raise RuntimeError("Database not initialized. Call connect_db() first.") return db.database # ============================================================ # Helper function to get collections # ============================================================ async def get_users_collection(): """Get users collection""" database = await get_db() return database["users"] async def get_otps_collection(): """Get OTPs collection""" database = await get_db() return database["otps"] # ============================================================ # Create indexes on startup # ============================================================ async def ensure_indexes(): """Create necessary indexes for collections""" try: users_col = await get_users_collection() otps_col = await get_otps_collection() # Users indexes await users_col.create_index("email", sparse=True, unique=True) await users_col.create_index("phone", sparse=True, unique=True) await users_col.create_index("role") await users_col.create_index("isActive") # OTP indexes - handle TTL index conflict by dropping if options differ try: # Try to drop existing TTL index with different options await otps_col.drop_index("createdAt_1") except Exception: pass # Index doesn't exist, that's fine await otps_col.create_index("identifier") await otps_col.create_index("purpose") await otps_col.create_index([("createdAt", 1)], expireAfterSeconds=900) # 15 min TTL logger.info("Database indexes created successfully") except Exception as e: logger.error(f"Failed to create indexes: {str(e)}") async def ensure_review_indexes(): """Create indexes for reviews collection""" try: database = await get_db() reviews_col = database["reviews"] # Index for querying reviews by target await reviews_col.create_index([("target_type", 1), ("target_id", 1)]) # Unique index to prevent duplicate reviews (same user can't review same target twice) await reviews_col.create_index( [("reviewer_id", 1), ("target_type", 1), ("target_id", 1)], unique=True ) # Index for sorting by created_at await reviews_col.create_index("created_at") logger.info("Review indexes created successfully") except Exception as e: logger.error(f"Failed to create review indexes: {str(e)}") async def ensure_chat_indexes(): """Create indexes for chat collections""" try: database = await get_db() conversations_col = database["conversations"] messages_col = database["messages"] calls_col = database["calls"] # Conversations indexes # Unique index on participants_key: ONE conversation per user pair (not per listing) # Using participants_key ("::" joined sorted IDs) avoids multikey index issues # Drop existing indexes if they exist to recreate properly try: await conversations_col.drop_index("participants_1") except Exception: pass # Index doesn't exist, that's fine try: await conversations_col.drop_index("participants_key_1") except Exception: pass # Index doesn't exist, that's fine # CRITICAL: Drop the old listing_id + participants compound index # This index blocks multiple AIDA conversations because all use listing_id="system" # and participants contains "AIDA_BOT" try: await conversations_col.drop_index("listing_id_1_participants_1") logger.info("Dropped problematic listing_id_1_participants_1 index") except Exception: pass # Index doesn't exist, that's fine await conversations_col.create_index( "participants_key", unique=True ) # Keep a non-unique index on participants for queries await conversations_col.create_index("participants") # Index for sorting by last activity await conversations_col.create_index([("updated_at", -1)]) # Messages indexes # Index for getting messages in a conversation await messages_col.create_index([("conversation_id", 1), ("created_at", -1)]) # Index for querying by sender await messages_col.create_index("sender_id") # Calls indexes # Index for getting call history await calls_col.create_index([("conversation_id", 1), ("created_at", -1)]) # Indexes for querying by caller/receiver await calls_col.create_index("caller_id") await calls_col.create_index("receiver_id") # Index for querying by status await calls_col.create_index("status") logger.info("Chat indexes created successfully") except Exception as e: logger.error(f"Failed to create chat indexes: {str(e)}")