Spaces:
Running
Running
File size: 5,208 Bytes
1bc6a5b 1333c38 1bc6a5b 1333c38 1bc6a5b 1333c38 c135be2 1333c38 c135be2 1333c38 c135be2 1333c38 c135be2 1333c38 1bc6a5b 1333c38 c135be2 1333c38 c135be2 1333c38 c135be2 1333c38 c135be2 1333c38 1bc6a5b 1333c38 c135be2 1bc6a5b 1333c38 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
import motor.motor_asyncio
from bson import ObjectId
from typing import List, Dict, Any
import numpy as np
from config import settings
class Database:
def __init__(self):
self.client = None
self.db = None
self.collection = None
async def connect(self):
self.client = motor.motor_asyncio.AsyncIOMotorClient(settings.MONGODB_URI)
self.db = self.client[settings.DATABASE_NAME]
self.collection = self.db[settings.COLLECTION_NAME]
print(f"✅ Connected to MongoDB: {settings.DATABASE_NAME}.{settings.COLLECTION_NAME}")
async def similarity_search(self, query_embedding: List[float], limit: int = 3) -> List[Dict]:
"""Search for similar products using MongoDB Atlas Vector Search"""
try:
pipeline = [
{
"$vectorSearch": {
"index": "vector_index", # Make sure this matches your Atlas index name
"path": "embedding",
"queryVector": query_embedding,
"numCandidates": 150,
"limit": limit
}
},
{
"$project": {
"_id": 1,
"title": 1,
"category": 1,
"product_description": 1,
"final_price": 1,
"score": {"$meta": "vectorSearchScore"}
}
}
]
cursor = self.collection.aggregate(pipeline)
results = []
async for doc in cursor:
results.append({
"id": str(doc["_id"]),
"content": self._create_product_content(doc),
"source": doc.get('title', 'product_database'),
"metadata": {
"category": doc.get('category', 'N/A'),
"price": doc.get('final_price', 'N/A'),
"similarity_score": doc.get('score', 0)
}
})
return results
except Exception as e:
print(f"❌ Vector search error: {e}")
# Fallback to text search
return await self.search_by_text("tops", limit)
def _create_product_content(self, doc: Dict) -> str:
"""Create formatted product content for the LLM"""
content_parts = [
f"Product: {doc.get('title', 'N/A')}",
f"Description: {doc.get('product_description', 'N/A')}",
f"Category: {doc.get('category', 'N/A')}",
f"Price: ₹{doc.get('final_price', 'N/A')}"
]
return ". ".join(content_parts)
async def search_by_text(self, query: str, limit: int = 5) -> List[Dict]:
"""Fallback text search if vector search fails"""
cursor = self.collection.find({
"$or": [
{"title": {"$regex": query, "$options": "i"}},
{"category": {"$regex": query, "$options": "i"}},
{"product_description": {"$regex": query, "$options": "i"}}
]
}).limit(limit)
results = []
async for doc in cursor:
results.append({
"id": str(doc["_id"]),
"content": self._create_product_content(doc),
"source": doc.get('title', 'product_database'),
"metadata": {
"category": doc.get('category', 'N/A'),
"price": doc.get('final_price', 'N/A')
}
})
return results
async def search_by_category(self, category: str, limit: int = 5) -> List[Dict]:
"""Search products by category"""
cursor = self.collection.find(
{"category": {"$regex": category, "$options": "i"}}
).limit(limit)
results = []
async for doc in cursor:
results.append({
"id": str(doc["_id"]),
"content": self._create_product_content(doc),
"source": doc.get('title', 'product_database'),
"metadata": {
"category": doc.get('category', 'N/A'),
"price": doc.get('final_price', 'N/A')
}
})
return results
async def insert_documents(self, documents: List[Dict]) -> List[str]:
"""Insert documents into the collection"""
result = await self.collection.insert_many(documents)
return [str(id) for id in result.inserted_ids]
async def get_collection_stats(self):
"""Get collection statistics"""
total_docs = await self.collection.count_documents({})
docs_with_embeddings = await self.collection.count_documents({"embedding": {"$exists": True}})
return {
"total_documents": total_docs,
"documents_with_embeddings": docs_with_embeddings,
"embedding_coverage": f"{(docs_with_embeddings/total_docs*100):.1f}%" if total_docs > 0 else "0%"
}
# Global database instance
db = Database() |