plg4-dev-server / backend /services /custom_mongo_vector.py
Jesse Johnson
New commit for backend deployment: 2025-09-25_13-24-03
c59d808
"""
Streamlined MongoDB Vector Store with Atlas Vector Search
"""
from typing import List, Dict, Any, Optional, NamedTuple
import numpy as np
from langchain.schema import Document
from langchain.vectorstores.base import VectorStore
from pymongo.collection import Collection
from backend.config.logging_config import get_logger
logger = get_logger("custom_mongo_vector")
class VectorSearchOptions(NamedTuple):
"""Configuration options for vector search"""
index_name: str = "foodInstructionIndex"
embedding_key: str = "ingredients_emb"
text_key: str = "title"
num_candidates: int = 50
similarity_metric: str = "cosine" # cosine or dotProduct
class CustomMongoDBVectorStore(VectorStore):
"""
Streamlined MongoDB Atlas Vector Store with efficient $vectorSearch aggregation.
Falls back to Python similarity calculation when Atlas Vector Search is unavailable.
"""
def __init__(
self,
collection: Collection,
embedding_function,
options: Optional[VectorSearchOptions] = None
):
self.collection = collection
self.embedding_function = embedding_function
self.options = options or VectorSearchOptions()
logger.info(f"🔧 Streamlined MongoDB Vector Store initialized")
logger.info(f"� Config: {self.options.index_name} index, {self.options.similarity_metric} similarity")
def _calculate_similarity(self, vec1: List[float], vec2: List[float]) -> float:
"""Calculate similarity using the most efficient method"""
a, b = np.array(vec1), np.array(vec2)
if self.options.similarity_metric == "dotProduct":
# Dot product (faster, good for normalized embeddings)
return float(np.dot(a, b))
else:
# Cosine similarity (more robust, handles non-normalized embeddings)
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
def similarity_search(self, query: str, k: int = 4, **kwargs: Any) -> List[Document]:
"""Streamlined similarity search using Atlas Vector Search with Python fallback"""
logger.info(f"🔍 Searching: '{query}' (k={k})")
qvec = self.embedding_function.embed_query(query)
# Primary: Try Atlas Vector Search (efficient, server-side)
try:
pipeline = [
{
"$vectorSearch": {
"index": self.options.index_name,
"path": self.options.embedding_key,
"queryVector": qvec,
"numCandidates": self.options.num_candidates,
"limit": k
}
},
{
"$match": {
'$or': [
{ 'needs_review': { '$exists': False } },
{ 'needs_review': False }
]
}
}
]
results = list(self.collection.aggregate(pipeline))
if results:
logger.info(f"✅ Atlas Vector Search: {len(results)} results")
return self._create_documents(results)
except Exception as e:
logger.warning(f"⚠️ Atlas Vector Search failed: {e}")
# Fallback: Python similarity calculation
logger.info("🔄 Using Python similarity fallback")
return self._python_similarity_search(qvec, k)
def _python_similarity_search(self, qvec: List[float], k: int) -> List[Document]:
"""Efficient Python-based similarity search fallback"""
cursor = self.collection.find(
{'$or': [
{'needs_review': {'$exists': False}},
{'needs_review': False}
]},
{self.options.text_key: 1, self.options.embedding_key: 1, "ingredients": 1, "instructions": 1}
)
# Vectorized similarity calculation for efficiency
similarities = []
for doc in cursor:
doc_emb = doc.get(self.options.embedding_key)
if doc_emb and len(doc_emb) == len(qvec):
score = self._calculate_similarity(qvec, doc_emb)
similarities.append((doc, score))
# Return top-k results
similarities.sort(key=lambda x: x[1], reverse=True)
top_docs = [doc for doc, _ in similarities[:k]]
logger.info(f"📊 Python fallback: {len(similarities)} processed, {len(top_docs)} returned")
return self._create_documents(top_docs)
def _create_documents(self, docs: List[Dict]) -> List[Document]:
"""Create LangChain Documents from MongoDB results using clean string content"""
documents = []
for doc in docs:
title = doc.get(self.options.text_key, "Untitled Recipe")
ingredients = doc.get("ingredients", "")
instructions = doc.get("instructions", "")
# Build clean content without extra formatting
content_parts = [f"Recipe: {title}"]
if ingredients:
content_parts.append(f"Ingredients: {ingredients}")
if instructions:
content_parts.append(f"Instructions: {instructions}")
content = "\n\n".join(content_parts)
documents.append(Document(
page_content=content,
metadata={"_id": str(doc["_id"]), "title": title}
))
return documents
def similarity_search_with_score(self, query: str, k: int = 4, **kwargs: Any) -> List[tuple]:
"""Return docs with similarity scores (simplified)"""
docs = self.similarity_search(query, k, **kwargs)
return [(doc, 1.0) for doc in docs] # Atlas Vector Search doesn't return raw scores
def add_texts(self, texts: List[str], metadatas: Optional[List[dict]] = None, **kwargs: Any) -> List[str]:
"""Read-only vector store - adding texts not supported"""
raise NotImplementedError("This vector store is read-only for pre-existing embeddings")
@classmethod
def from_texts(cls, texts: List[str], embedding_function, metadatas: Optional[List[dict]] = None, **kwargs: Any):
"""Read-only vector store - creating from texts not supported"""
raise NotImplementedError("This vector store is read-only for pre-existing embeddings")