import motor.motor_asyncio from bson import ObjectId from typing import List, Dict, Any import numpy as np from config import settings class Database: def __init__(self): self.client = None self.db = None self.collection = None async def connect(self): self.client = motor.motor_asyncio.AsyncIOMotorClient(settings.MONGODB_URI) self.db = self.client[settings.DATABASE_NAME] self.collection = self.db[settings.COLLECTION_NAME] print(f"✅ Connected to MongoDB: {settings.DATABASE_NAME}.{settings.COLLECTION_NAME}") async def similarity_search(self, query_embedding: List[float], limit: int = 3) -> List[Dict]: """Search for similar products using MongoDB Atlas Vector Search""" try: pipeline = [ { "$vectorSearch": { "index": "vector_index", # Make sure this matches your Atlas index name "path": "embedding", "queryVector": query_embedding, "numCandidates": 150, "limit": limit } }, { "$project": { "_id": 1, "title": 1, "category": 1, "product_description": 1, "final_price": 1, "score": {"$meta": "vectorSearchScore"} } } ] cursor = self.collection.aggregate(pipeline) results = [] async for doc in cursor: results.append({ "id": str(doc["_id"]), "content": self._create_product_content(doc), "source": doc.get('title', 'product_database'), "metadata": { "category": doc.get('category', 'N/A'), "price": doc.get('final_price', 'N/A'), "similarity_score": doc.get('score', 0) } }) return results except Exception as e: print(f"❌ Vector search error: {e}") # Fallback to text search return await self.search_by_text("tops", limit) def _create_product_content(self, doc: Dict) -> str: """Create formatted product content for the LLM""" content_parts = [ f"Product: {doc.get('title', 'N/A')}", f"Description: {doc.get('product_description', 'N/A')}", f"Category: {doc.get('category', 'N/A')}", f"Price: ₹{doc.get('final_price', 'N/A')}" ] return ". ".join(content_parts) async def search_by_text(self, query: str, limit: int = 5) -> List[Dict]: """Fallback text search if vector search fails""" cursor = self.collection.find({ "$or": [ {"title": {"$regex": query, "$options": "i"}}, {"category": {"$regex": query, "$options": "i"}}, {"product_description": {"$regex": query, "$options": "i"}} ] }).limit(limit) results = [] async for doc in cursor: results.append({ "id": str(doc["_id"]), "content": self._create_product_content(doc), "source": doc.get('title', 'product_database'), "metadata": { "category": doc.get('category', 'N/A'), "price": doc.get('final_price', 'N/A') } }) return results async def search_by_category(self, category: str, limit: int = 5) -> List[Dict]: """Search products by category""" cursor = self.collection.find( {"category": {"$regex": category, "$options": "i"}} ).limit(limit) results = [] async for doc in cursor: results.append({ "id": str(doc["_id"]), "content": self._create_product_content(doc), "source": doc.get('title', 'product_database'), "metadata": { "category": doc.get('category', 'N/A'), "price": doc.get('final_price', 'N/A') } }) return results async def insert_documents(self, documents: List[Dict]) -> List[str]: """Insert documents into the collection""" result = await self.collection.insert_many(documents) return [str(id) for id in result.inserted_ids] async def get_collection_stats(self): """Get collection statistics""" total_docs = await self.collection.count_documents({}) docs_with_embeddings = await self.collection.count_documents({"embedding": {"$exists": True}}) return { "total_documents": total_docs, "documents_with_embeddings": docs_with_embeddings, "embedding_coverage": f"{(docs_with_embeddings/total_docs*100):.1f}%" if total_docs > 0 else "0%" } # Global database instance db = Database()