Spaces:
Running
Running
File size: 7,236 Bytes
79ef7e1 ee7602c 79ef7e1 4e15311 79ef7e1 bbcb4f3 0a645ca 6687173 4e15311 6687173 49990ab 0a645ca 6687173 0a645ca 6687173 0a645ca |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
# ============================================================
# app/database.py - MongoDB Connection Management (FIXED)
# ============================================================
from motor.motor_asyncio import AsyncIOMotorClient as AsyncClient
from motor.motor_asyncio import AsyncIOMotorDatabase as AsyncDatabase
import logging
from app.config import settings
logger = logging.getLogger(__name__)
class DatabaseConnection:
"""Singleton for MongoDB async connection"""
client: AsyncClient = None
database: AsyncDatabase = None
db = DatabaseConnection()
async def connect_db():
"""Connect to MongoDB"""
try:
db.client = AsyncClient(
settings.MONGODB_URL,
maxPoolSize=100, # Max connections (Scale: ~300 per replica member)
minPoolSize=10, # Keep warm connections ready
maxIdleTimeMS=45000, # Close idle connections after 45s
waitQueueTimeoutMS=10000, # Fail fast if pool is exhausted (10s)
serverSelectionTimeoutMS=5000, # Fail fast if DB is down (5s)
)
db.database = db.client[settings.MONGODB_DATABASE]
# Test connection
await db.client.admin.command("ping")
logger.info(f"Connected to MongoDB: {settings.MONGODB_DATABASE}")
except Exception as e:
logger.error(f"Failed to connect to MongoDB: {str(e)}")
raise
async def disconnect_db():
"""Disconnect from MongoDB"""
if db.client is not None: # FIX: Use 'is not None' instead of bool check
db.client.close()
logger.info("Disconnected from MongoDB")
async def get_db() -> AsyncDatabase:
"""Get database instance - NOW ASYNC"""
# FIX 1: Compare with None instead of using bool()
# FIX 2: Make this async since it was being called in async context
if db.database is None:
raise RuntimeError("Database not initialized. Call connect_db() first.")
return db.database
def get_db_sync() -> AsyncDatabase:
"""Get database instance synchronously (use with caution)"""
if db.database is None:
raise RuntimeError("Database not initialized. Call connect_db() first.")
return db.database
# ============================================================
# Helper function to get collections
# ============================================================
async def get_users_collection():
"""Get users collection"""
database = await get_db()
return database["users"]
async def get_otps_collection():
"""Get OTPs collection"""
database = await get_db()
return database["otps"]
# ============================================================
# Create indexes on startup
# ============================================================
async def ensure_indexes():
"""Create necessary indexes for collections"""
try:
users_col = await get_users_collection()
otps_col = await get_otps_collection()
# Users indexes
await users_col.create_index("email", sparse=True, unique=True)
await users_col.create_index("phone", sparse=True, unique=True)
await users_col.create_index("role")
await users_col.create_index("isActive")
# OTP indexes - handle TTL index conflict by dropping if options differ
try:
# Try to drop existing TTL index with different options
await otps_col.drop_index("createdAt_1")
except Exception:
pass # Index doesn't exist, that's fine
await otps_col.create_index("identifier")
await otps_col.create_index("purpose")
await otps_col.create_index([("createdAt", 1)], expireAfterSeconds=900) # 15 min TTL
logger.info("Database indexes created successfully")
except Exception as e:
logger.error(f"Failed to create indexes: {str(e)}")
async def ensure_review_indexes():
"""Create indexes for reviews collection"""
try:
database = await get_db()
reviews_col = database["reviews"]
# Index for querying reviews by target
await reviews_col.create_index([("target_type", 1), ("target_id", 1)])
# Unique index to prevent duplicate reviews (same user can't review same target twice)
await reviews_col.create_index(
[("reviewer_id", 1), ("target_type", 1), ("target_id", 1)],
unique=True
)
# Index for sorting by created_at
await reviews_col.create_index("created_at")
logger.info("Review indexes created successfully")
except Exception as e:
logger.error(f"Failed to create review indexes: {str(e)}")
async def ensure_chat_indexes():
"""Create indexes for chat collections"""
try:
database = await get_db()
conversations_col = database["conversations"]
messages_col = database["messages"]
calls_col = database["calls"]
# Conversations indexes
# Unique index on participants_key: ONE conversation per user pair (not per listing)
# Using participants_key ("::" joined sorted IDs) avoids multikey index issues
# Drop existing indexes if they exist to recreate properly
try:
await conversations_col.drop_index("participants_1")
except Exception:
pass # Index doesn't exist, that's fine
try:
await conversations_col.drop_index("participants_key_1")
except Exception:
pass # Index doesn't exist, that's fine
# CRITICAL: Drop the old listing_id + participants compound index
# This index blocks multiple AIDA conversations because all use listing_id="system"
# and participants contains "AIDA_BOT"
try:
await conversations_col.drop_index("listing_id_1_participants_1")
logger.info("Dropped problematic listing_id_1_participants_1 index")
except Exception:
pass # Index doesn't exist, that's fine
await conversations_col.create_index(
"participants_key",
unique=True
)
# Keep a non-unique index on participants for queries
await conversations_col.create_index("participants")
# Index for sorting by last activity
await conversations_col.create_index([("updated_at", -1)])
# Messages indexes
# Index for getting messages in a conversation
await messages_col.create_index([("conversation_id", 1), ("created_at", -1)])
# Index for querying by sender
await messages_col.create_index("sender_id")
# Calls indexes
# Index for getting call history
await calls_col.create_index([("conversation_id", 1), ("created_at", -1)])
# Indexes for querying by caller/receiver
await calls_col.create_index("caller_id")
await calls_col.create_index("receiver_id")
# Index for querying by status
await calls_col.create_index("status")
logger.info("Chat indexes created successfully")
except Exception as e:
logger.error(f"Failed to create chat indexes: {str(e)}") |