Spaces:
Running
Running
| import motor.motor_asyncio | |
| from bson import ObjectId | |
| from typing import List, Dict, Any | |
| import numpy as np | |
| from config import settings | |
| class Database: | |
| def __init__(self): | |
| self.client = None | |
| self.db = None | |
| self.collection = None | |
| async def connect(self): | |
| self.client = motor.motor_asyncio.AsyncIOMotorClient(settings.MONGODB_URI) | |
| self.db = self.client[settings.DATABASE_NAME] | |
| self.collection = self.db[settings.COLLECTION_NAME] | |
| print(f"✅ Connected to MongoDB: {settings.DATABASE_NAME}.{settings.COLLECTION_NAME}") | |
| async def similarity_search(self, query_embedding: List[float], limit: int = 3) -> List[Dict]: | |
| """Search for similar products using MongoDB Atlas Vector Search""" | |
| try: | |
| pipeline = [ | |
| { | |
| "$vectorSearch": { | |
| "index": "vector_index", # Make sure this matches your Atlas index name | |
| "path": "embedding", | |
| "queryVector": query_embedding, | |
| "numCandidates": 150, | |
| "limit": limit | |
| } | |
| }, | |
| { | |
| "$project": { | |
| "_id": 1, | |
| "title": 1, | |
| "category": 1, | |
| "product_description": 1, | |
| "final_price": 1, | |
| "score": {"$meta": "vectorSearchScore"} | |
| } | |
| } | |
| ] | |
| cursor = self.collection.aggregate(pipeline) | |
| results = [] | |
| async for doc in cursor: | |
| results.append({ | |
| "id": str(doc["_id"]), | |
| "content": self._create_product_content(doc), | |
| "source": doc.get('title', 'product_database'), | |
| "metadata": { | |
| "category": doc.get('category', 'N/A'), | |
| "price": doc.get('final_price', 'N/A'), | |
| "similarity_score": doc.get('score', 0) | |
| } | |
| }) | |
| return results | |
| except Exception as e: | |
| print(f"❌ Vector search error: {e}") | |
| # Fallback to text search | |
| return await self.search_by_text("tops", limit) | |
| def _create_product_content(self, doc: Dict) -> str: | |
| """Create formatted product content for the LLM""" | |
| content_parts = [ | |
| f"Product: {doc.get('title', 'N/A')}", | |
| f"Description: {doc.get('product_description', 'N/A')}", | |
| f"Category: {doc.get('category', 'N/A')}", | |
| f"Price: ₹{doc.get('final_price', 'N/A')}" | |
| ] | |
| return ". ".join(content_parts) | |
| async def search_by_text(self, query: str, limit: int = 5) -> List[Dict]: | |
| """Fallback text search if vector search fails""" | |
| cursor = self.collection.find({ | |
| "$or": [ | |
| {"title": {"$regex": query, "$options": "i"}}, | |
| {"category": {"$regex": query, "$options": "i"}}, | |
| {"product_description": {"$regex": query, "$options": "i"}} | |
| ] | |
| }).limit(limit) | |
| results = [] | |
| async for doc in cursor: | |
| results.append({ | |
| "id": str(doc["_id"]), | |
| "content": self._create_product_content(doc), | |
| "source": doc.get('title', 'product_database'), | |
| "metadata": { | |
| "category": doc.get('category', 'N/A'), | |
| "price": doc.get('final_price', 'N/A') | |
| } | |
| }) | |
| return results | |
| async def search_by_category(self, category: str, limit: int = 5) -> List[Dict]: | |
| """Search products by category""" | |
| cursor = self.collection.find( | |
| {"category": {"$regex": category, "$options": "i"}} | |
| ).limit(limit) | |
| results = [] | |
| async for doc in cursor: | |
| results.append({ | |
| "id": str(doc["_id"]), | |
| "content": self._create_product_content(doc), | |
| "source": doc.get('title', 'product_database'), | |
| "metadata": { | |
| "category": doc.get('category', 'N/A'), | |
| "price": doc.get('final_price', 'N/A') | |
| } | |
| }) | |
| return results | |
| async def insert_documents(self, documents: List[Dict]) -> List[str]: | |
| """Insert documents into the collection""" | |
| result = await self.collection.insert_many(documents) | |
| return [str(id) for id in result.inserted_ids] | |
| async def get_collection_stats(self): | |
| """Get collection statistics""" | |
| total_docs = await self.collection.count_documents({}) | |
| docs_with_embeddings = await self.collection.count_documents({"embedding": {"$exists": True}}) | |
| return { | |
| "total_documents": total_docs, | |
| "documents_with_embeddings": docs_with_embeddings, | |
| "embedding_coverage": f"{(docs_with_embeddings/total_docs*100):.1f}%" if total_docs > 0 else "0%" | |
| } | |
| # Global database instance | |
| db = Database() |