Spaces:
Running
Running
| # ============================================================ | |
| # app/database.py - MongoDB Connection Management (FIXED) | |
| # ============================================================ | |
| from motor.motor_asyncio import AsyncIOMotorClient as AsyncClient | |
| from motor.motor_asyncio import AsyncIOMotorDatabase as AsyncDatabase | |
| import logging | |
| from app.config import settings | |
| logger = logging.getLogger(__name__) | |
| class DatabaseConnection: | |
| """Singleton for MongoDB async connection""" | |
| client: AsyncClient = None | |
| database: AsyncDatabase = None | |
| db = DatabaseConnection() | |
| async def connect_db(): | |
| """Connect to MongoDB""" | |
| try: | |
| db.client = AsyncClient( | |
| settings.MONGODB_URL, | |
| maxPoolSize=100, # Max connections (Scale: ~300 per replica member) | |
| minPoolSize=10, # Keep warm connections ready | |
| maxIdleTimeMS=45000, # Close idle connections after 45s | |
| waitQueueTimeoutMS=10000, # Fail fast if pool is exhausted (10s) | |
| serverSelectionTimeoutMS=5000, # Fail fast if DB is down (5s) | |
| ) | |
| db.database = db.client[settings.MONGODB_DATABASE] | |
| # Test connection | |
| await db.client.admin.command("ping") | |
| logger.info(f"Connected to MongoDB: {settings.MONGODB_DATABASE}") | |
| except Exception as e: | |
| logger.error(f"Failed to connect to MongoDB: {str(e)}") | |
| raise | |
| async def disconnect_db(): | |
| """Disconnect from MongoDB""" | |
| if db.client is not None: # FIX: Use 'is not None' instead of bool check | |
| db.client.close() | |
| logger.info("Disconnected from MongoDB") | |
| async def get_db() -> AsyncDatabase: | |
| """Get database instance - NOW ASYNC""" | |
| # FIX 1: Compare with None instead of using bool() | |
| # FIX 2: Make this async since it was being called in async context | |
| if db.database is None: | |
| raise RuntimeError("Database not initialized. Call connect_db() first.") | |
| return db.database | |
| def get_db_sync() -> AsyncDatabase: | |
| """Get database instance synchronously (use with caution)""" | |
| if db.database is None: | |
| raise RuntimeError("Database not initialized. Call connect_db() first.") | |
| return db.database | |
| # ============================================================ | |
| # Helper function to get collections | |
| # ============================================================ | |
| async def get_users_collection(): | |
| """Get users collection""" | |
| database = await get_db() | |
| return database["users"] | |
| async def get_otps_collection(): | |
| """Get OTPs collection""" | |
| database = await get_db() | |
| return database["otps"] | |
| # ============================================================ | |
| # Create indexes on startup | |
| # ============================================================ | |
| async def ensure_indexes(): | |
| """Create necessary indexes for collections""" | |
| try: | |
| users_col = await get_users_collection() | |
| otps_col = await get_otps_collection() | |
| # Users indexes | |
| await users_col.create_index("email", sparse=True, unique=True) | |
| await users_col.create_index("phone", sparse=True, unique=True) | |
| await users_col.create_index("role") | |
| await users_col.create_index("isActive") | |
| # OTP indexes - handle TTL index conflict by dropping if options differ | |
| try: | |
| # Try to drop existing TTL index with different options | |
| await otps_col.drop_index("createdAt_1") | |
| except Exception: | |
| pass # Index doesn't exist, that's fine | |
| await otps_col.create_index("identifier") | |
| await otps_col.create_index("purpose") | |
| await otps_col.create_index([("createdAt", 1)], expireAfterSeconds=900) # 15 min TTL | |
| logger.info("Database indexes created successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to create indexes: {str(e)}") | |
| async def ensure_review_indexes(): | |
| """Create indexes for reviews collection""" | |
| try: | |
| database = await get_db() | |
| reviews_col = database["reviews"] | |
| # Index for querying reviews by target | |
| await reviews_col.create_index([("target_type", 1), ("target_id", 1)]) | |
| # Unique index to prevent duplicate reviews (same user can't review same target twice) | |
| await reviews_col.create_index( | |
| [("reviewer_id", 1), ("target_type", 1), ("target_id", 1)], | |
| unique=True | |
| ) | |
| # Index for sorting by created_at | |
| await reviews_col.create_index("created_at") | |
| logger.info("Review indexes created successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to create review indexes: {str(e)}") | |
| async def ensure_chat_indexes(): | |
| """Create indexes for chat collections""" | |
| try: | |
| database = await get_db() | |
| conversations_col = database["conversations"] | |
| messages_col = database["messages"] | |
| calls_col = database["calls"] | |
| # Conversations indexes | |
| # Unique index on participants_key: ONE conversation per user pair (not per listing) | |
| # Using participants_key ("::" joined sorted IDs) avoids multikey index issues | |
| # Drop existing indexes if they exist to recreate properly | |
| try: | |
| await conversations_col.drop_index("participants_1") | |
| except Exception: | |
| pass # Index doesn't exist, that's fine | |
| try: | |
| await conversations_col.drop_index("participants_key_1") | |
| except Exception: | |
| pass # Index doesn't exist, that's fine | |
| # CRITICAL: Drop the old listing_id + participants compound index | |
| # This index blocks multiple AIDA conversations because all use listing_id="system" | |
| # and participants contains "AIDA_BOT" | |
| try: | |
| await conversations_col.drop_index("listing_id_1_participants_1") | |
| logger.info("Dropped problematic listing_id_1_participants_1 index") | |
| except Exception: | |
| pass # Index doesn't exist, that's fine | |
| await conversations_col.create_index( | |
| "participants_key", | |
| unique=True | |
| ) | |
| # Keep a non-unique index on participants for queries | |
| await conversations_col.create_index("participants") | |
| # Index for sorting by last activity | |
| await conversations_col.create_index([("updated_at", -1)]) | |
| # Messages indexes | |
| # Index for getting messages in a conversation | |
| await messages_col.create_index([("conversation_id", 1), ("created_at", -1)]) | |
| # Index for querying by sender | |
| await messages_col.create_index("sender_id") | |
| # Calls indexes | |
| # Index for getting call history | |
| await calls_col.create_index([("conversation_id", 1), ("created_at", -1)]) | |
| # Indexes for querying by caller/receiver | |
| await calls_col.create_index("caller_id") | |
| await calls_col.create_index("receiver_id") | |
| # Index for querying by status | |
| await calls_col.create_index("status") | |
| logger.info("Chat indexes created successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to create chat indexes: {str(e)}") |