AIDA / app /database.py
destinyebuka's picture
mvp fix
ee7602c
# ============================================================
# app/database.py - MongoDB Connection Management (FIXED)
# ============================================================
from motor.motor_asyncio import AsyncIOMotorClient as AsyncClient
from motor.motor_asyncio import AsyncIOMotorDatabase as AsyncDatabase
import logging
from app.config import settings
logger = logging.getLogger(__name__)
class DatabaseConnection:
"""Singleton for MongoDB async connection"""
client: AsyncClient = None
database: AsyncDatabase = None
db = DatabaseConnection()
async def connect_db():
"""Connect to MongoDB"""
try:
db.client = AsyncClient(
settings.MONGODB_URL,
maxPoolSize=100, # Max connections (Scale: ~300 per replica member)
minPoolSize=10, # Keep warm connections ready
maxIdleTimeMS=45000, # Close idle connections after 45s
waitQueueTimeoutMS=10000, # Fail fast if pool is exhausted (10s)
serverSelectionTimeoutMS=5000, # Fail fast if DB is down (5s)
)
db.database = db.client[settings.MONGODB_DATABASE]
# Test connection
await db.client.admin.command("ping")
logger.info(f"Connected to MongoDB: {settings.MONGODB_DATABASE}")
except Exception as e:
logger.error(f"Failed to connect to MongoDB: {str(e)}")
raise
async def disconnect_db():
"""Disconnect from MongoDB"""
if db.client is not None: # FIX: Use 'is not None' instead of bool check
db.client.close()
logger.info("Disconnected from MongoDB")
async def get_db() -> AsyncDatabase:
"""Get database instance - NOW ASYNC"""
# FIX 1: Compare with None instead of using bool()
# FIX 2: Make this async since it was being called in async context
if db.database is None:
raise RuntimeError("Database not initialized. Call connect_db() first.")
return db.database
def get_db_sync() -> AsyncDatabase:
"""Get database instance synchronously (use with caution)"""
if db.database is None:
raise RuntimeError("Database not initialized. Call connect_db() first.")
return db.database
# ============================================================
# Helper function to get collections
# ============================================================
async def get_users_collection():
"""Get users collection"""
database = await get_db()
return database["users"]
async def get_otps_collection():
"""Get OTPs collection"""
database = await get_db()
return database["otps"]
# ============================================================
# Create indexes on startup
# ============================================================
async def ensure_indexes():
"""Create necessary indexes for collections"""
try:
users_col = await get_users_collection()
otps_col = await get_otps_collection()
# Users indexes
await users_col.create_index("email", sparse=True, unique=True)
await users_col.create_index("phone", sparse=True, unique=True)
await users_col.create_index("role")
await users_col.create_index("isActive")
# OTP indexes - handle TTL index conflict by dropping if options differ
try:
# Try to drop existing TTL index with different options
await otps_col.drop_index("createdAt_1")
except Exception:
pass # Index doesn't exist, that's fine
await otps_col.create_index("identifier")
await otps_col.create_index("purpose")
await otps_col.create_index([("createdAt", 1)], expireAfterSeconds=900) # 15 min TTL
logger.info("Database indexes created successfully")
except Exception as e:
logger.error(f"Failed to create indexes: {str(e)}")
async def ensure_review_indexes():
"""Create indexes for reviews collection"""
try:
database = await get_db()
reviews_col = database["reviews"]
# Index for querying reviews by target
await reviews_col.create_index([("target_type", 1), ("target_id", 1)])
# Unique index to prevent duplicate reviews (same user can't review same target twice)
await reviews_col.create_index(
[("reviewer_id", 1), ("target_type", 1), ("target_id", 1)],
unique=True
)
# Index for sorting by created_at
await reviews_col.create_index("created_at")
logger.info("Review indexes created successfully")
except Exception as e:
logger.error(f"Failed to create review indexes: {str(e)}")
async def ensure_chat_indexes():
"""Create indexes for chat collections"""
try:
database = await get_db()
conversations_col = database["conversations"]
messages_col = database["messages"]
calls_col = database["calls"]
# Conversations indexes
# Unique index on participants_key: ONE conversation per user pair (not per listing)
# Using participants_key ("::" joined sorted IDs) avoids multikey index issues
# Drop existing indexes if they exist to recreate properly
try:
await conversations_col.drop_index("participants_1")
except Exception:
pass # Index doesn't exist, that's fine
try:
await conversations_col.drop_index("participants_key_1")
except Exception:
pass # Index doesn't exist, that's fine
# CRITICAL: Drop the old listing_id + participants compound index
# This index blocks multiple AIDA conversations because all use listing_id="system"
# and participants contains "AIDA_BOT"
try:
await conversations_col.drop_index("listing_id_1_participants_1")
logger.info("Dropped problematic listing_id_1_participants_1 index")
except Exception:
pass # Index doesn't exist, that's fine
await conversations_col.create_index(
"participants_key",
unique=True
)
# Keep a non-unique index on participants for queries
await conversations_col.create_index("participants")
# Index for sorting by last activity
await conversations_col.create_index([("updated_at", -1)])
# Messages indexes
# Index for getting messages in a conversation
await messages_col.create_index([("conversation_id", 1), ("created_at", -1)])
# Index for querying by sender
await messages_col.create_index("sender_id")
# Calls indexes
# Index for getting call history
await calls_col.create_index([("conversation_id", 1), ("created_at", -1)])
# Indexes for querying by caller/receiver
await calls_col.create_index("caller_id")
await calls_col.create_index("receiver_id")
# Index for querying by status
await calls_col.create_index("status")
logger.info("Chat indexes created successfully")
except Exception as e:
logger.error(f"Failed to create chat indexes: {str(e)}")