File size: 7,236 Bytes
79ef7e1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ee7602c
 
 
 
 
 
 
 
79ef7e1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4e15311
 
 
 
 
 
 
79ef7e1
 
 
 
 
 
bbcb4f3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0a645ca
 
 
 
 
 
 
 
 
 
 
 
6687173
 
 
4e15311
 
 
 
 
6687173
 
 
 
 
49990ab
 
 
 
 
 
 
 
 
0a645ca
6687173
0a645ca
 
6687173
 
0a645ca
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# ============================================================
# app/database.py - MongoDB Connection Management (FIXED)
# ============================================================

from motor.motor_asyncio import AsyncIOMotorClient as AsyncClient
from motor.motor_asyncio import AsyncIOMotorDatabase as AsyncDatabase
import logging
from app.config import settings

logger = logging.getLogger(__name__)

class DatabaseConnection:
    """Singleton for MongoDB async connection"""
    client: AsyncClient = None
    database: AsyncDatabase = None

db = DatabaseConnection()

async def connect_db():
    """Connect to MongoDB"""
    try:
        db.client = AsyncClient(
            settings.MONGODB_URL,
            maxPoolSize=100,           # Max connections (Scale: ~300 per replica member)
            minPoolSize=10,            # Keep warm connections ready
            maxIdleTimeMS=45000,       # Close idle connections after 45s
            waitQueueTimeoutMS=10000,  # Fail fast if pool is exhausted (10s)
            serverSelectionTimeoutMS=5000, # Fail fast if DB is down (5s)
        )
        db.database = db.client[settings.MONGODB_DATABASE]
        
        # Test connection
        await db.client.admin.command("ping")
        logger.info(f"Connected to MongoDB: {settings.MONGODB_DATABASE}")
    except Exception as e:
        logger.error(f"Failed to connect to MongoDB: {str(e)}")
        raise

async def disconnect_db():
    """Disconnect from MongoDB"""
    if db.client is not None:  # FIX: Use 'is not None' instead of bool check
        db.client.close()
        logger.info("Disconnected from MongoDB")

async def get_db() -> AsyncDatabase:
    """Get database instance - NOW ASYNC"""
    # FIX 1: Compare with None instead of using bool()
    # FIX 2: Make this async since it was being called in async context
    if db.database is None:
        raise RuntimeError("Database not initialized. Call connect_db() first.")
    return db.database

def get_db_sync() -> AsyncDatabase:
    """Get database instance synchronously (use with caution)"""
    if db.database is None:
        raise RuntimeError("Database not initialized. Call connect_db() first.")
    return db.database

# ============================================================
# Helper function to get collections
# ============================================================

async def get_users_collection():
    """Get users collection"""
    database = await get_db()
    return database["users"]

async def get_otps_collection():
    """Get OTPs collection"""
    database = await get_db()
    return database["otps"]

# ============================================================
# Create indexes on startup
# ============================================================

async def ensure_indexes():
    """Create necessary indexes for collections"""
    try:
        users_col = await get_users_collection()
        otps_col = await get_otps_collection()
        
        # Users indexes
        await users_col.create_index("email", sparse=True, unique=True)
        await users_col.create_index("phone", sparse=True, unique=True)
        await users_col.create_index("role")
        await users_col.create_index("isActive")
        
        # OTP indexes - handle TTL index conflict by dropping if options differ
        try:
            # Try to drop existing TTL index with different options
            await otps_col.drop_index("createdAt_1")
        except Exception:
            pass  # Index doesn't exist, that's fine
        
        await otps_col.create_index("identifier")
        await otps_col.create_index("purpose")
        await otps_col.create_index([("createdAt", 1)], expireAfterSeconds=900)  # 15 min TTL
        
        logger.info("Database indexes created successfully")
    except Exception as e:
        logger.error(f"Failed to create indexes: {str(e)}")


async def ensure_review_indexes():
    """Create indexes for reviews collection"""
    try:
        database = await get_db()
        reviews_col = database["reviews"]
        
        # Index for querying reviews by target
        await reviews_col.create_index([("target_type", 1), ("target_id", 1)])
        
        # Unique index to prevent duplicate reviews (same user can't review same target twice)
        await reviews_col.create_index(
            [("reviewer_id", 1), ("target_type", 1), ("target_id", 1)],
            unique=True
        )
        
        # Index for sorting by created_at
        await reviews_col.create_index("created_at")
        
        logger.info("Review indexes created successfully")
    except Exception as e:
        logger.error(f"Failed to create review indexes: {str(e)}")


async def ensure_chat_indexes():
    """Create indexes for chat collections"""
    try:
        database = await get_db()
        conversations_col = database["conversations"]
        messages_col = database["messages"]
        calls_col = database["calls"]
        
        # Conversations indexes
        # Unique index on participants_key: ONE conversation per user pair (not per listing)
        # Using participants_key ("::" joined sorted IDs) avoids multikey index issues
        # Drop existing indexes if they exist to recreate properly
        try:
            await conversations_col.drop_index("participants_1")
        except Exception:
            pass  # Index doesn't exist, that's fine
        
        try:
            await conversations_col.drop_index("participants_key_1")
        except Exception:
            pass  # Index doesn't exist, that's fine
        
        # CRITICAL: Drop the old listing_id + participants compound index
        # This index blocks multiple AIDA conversations because all use listing_id="system"
        # and participants contains "AIDA_BOT"
        try:
            await conversations_col.drop_index("listing_id_1_participants_1")
            logger.info("Dropped problematic listing_id_1_participants_1 index")
        except Exception:
            pass  # Index doesn't exist, that's fine
        
        await conversations_col.create_index(
            "participants_key",
            unique=True
        )
        # Keep a non-unique index on participants for queries
        await conversations_col.create_index("participants")
        # Index for sorting by last activity
        await conversations_col.create_index([("updated_at", -1)])
        
        # Messages indexes
        # Index for getting messages in a conversation
        await messages_col.create_index([("conversation_id", 1), ("created_at", -1)])
        # Index for querying by sender
        await messages_col.create_index("sender_id")
        
        # Calls indexes
        # Index for getting call history
        await calls_col.create_index([("conversation_id", 1), ("created_at", -1)])
        # Indexes for querying by caller/receiver
        await calls_col.create_index("caller_id")
        await calls_col.create_index("receiver_id")
        # Index for querying by status
        await calls_col.create_index("status")
        
        logger.info("Chat indexes created successfully")
    except Exception as e:
        logger.error(f"Failed to create chat indexes: {str(e)}")