d-commerce / database.py
Crcs1225
hey
c135be2
import motor.motor_asyncio
from bson import ObjectId
from typing import List, Dict, Any
import numpy as np
from config import settings
class Database:
def __init__(self):
self.client = None
self.db = None
self.collection = None
async def connect(self):
self.client = motor.motor_asyncio.AsyncIOMotorClient(settings.MONGODB_URI)
self.db = self.client[settings.DATABASE_NAME]
self.collection = self.db[settings.COLLECTION_NAME]
print(f"✅ Connected to MongoDB: {settings.DATABASE_NAME}.{settings.COLLECTION_NAME}")
async def similarity_search(self, query_embedding: List[float], limit: int = 3) -> List[Dict]:
"""Search for similar products using MongoDB Atlas Vector Search"""
try:
pipeline = [
{
"$vectorSearch": {
"index": "vector_index", # Make sure this matches your Atlas index name
"path": "embedding",
"queryVector": query_embedding,
"numCandidates": 150,
"limit": limit
}
},
{
"$project": {
"_id": 1,
"title": 1,
"category": 1,
"product_description": 1,
"final_price": 1,
"score": {"$meta": "vectorSearchScore"}
}
}
]
cursor = self.collection.aggregate(pipeline)
results = []
async for doc in cursor:
results.append({
"id": str(doc["_id"]),
"content": self._create_product_content(doc),
"source": doc.get('title', 'product_database'),
"metadata": {
"category": doc.get('category', 'N/A'),
"price": doc.get('final_price', 'N/A'),
"similarity_score": doc.get('score', 0)
}
})
return results
except Exception as e:
print(f"❌ Vector search error: {e}")
# Fallback to text search
return await self.search_by_text("tops", limit)
def _create_product_content(self, doc: Dict) -> str:
"""Create formatted product content for the LLM"""
content_parts = [
f"Product: {doc.get('title', 'N/A')}",
f"Description: {doc.get('product_description', 'N/A')}",
f"Category: {doc.get('category', 'N/A')}",
f"Price: ₹{doc.get('final_price', 'N/A')}"
]
return ". ".join(content_parts)
async def search_by_text(self, query: str, limit: int = 5) -> List[Dict]:
"""Fallback text search if vector search fails"""
cursor = self.collection.find({
"$or": [
{"title": {"$regex": query, "$options": "i"}},
{"category": {"$regex": query, "$options": "i"}},
{"product_description": {"$regex": query, "$options": "i"}}
]
}).limit(limit)
results = []
async for doc in cursor:
results.append({
"id": str(doc["_id"]),
"content": self._create_product_content(doc),
"source": doc.get('title', 'product_database'),
"metadata": {
"category": doc.get('category', 'N/A'),
"price": doc.get('final_price', 'N/A')
}
})
return results
async def search_by_category(self, category: str, limit: int = 5) -> List[Dict]:
"""Search products by category"""
cursor = self.collection.find(
{"category": {"$regex": category, "$options": "i"}}
).limit(limit)
results = []
async for doc in cursor:
results.append({
"id": str(doc["_id"]),
"content": self._create_product_content(doc),
"source": doc.get('title', 'product_database'),
"metadata": {
"category": doc.get('category', 'N/A'),
"price": doc.get('final_price', 'N/A')
}
})
return results
async def insert_documents(self, documents: List[Dict]) -> List[str]:
"""Insert documents into the collection"""
result = await self.collection.insert_many(documents)
return [str(id) for id in result.inserted_ids]
async def get_collection_stats(self):
"""Get collection statistics"""
total_docs = await self.collection.count_documents({})
docs_with_embeddings = await self.collection.count_documents({"embedding": {"$exists": True}})
return {
"total_documents": total_docs,
"documents_with_embeddings": docs_with_embeddings,
"embedding_coverage": f"{(docs_with_embeddings/total_docs*100):.1f}%" if total_docs > 0 else "0%"
}
# Global database instance
db = Database()