|
|
""" |
|
|
Streamlined MongoDB Vector Store with Atlas Vector Search |
|
|
""" |
|
|
|
|
|
from typing import List, Dict, Any, Optional, NamedTuple |
|
|
import numpy as np |
|
|
from langchain.schema import Document |
|
|
from langchain.vectorstores.base import VectorStore |
|
|
from pymongo.collection import Collection |
|
|
from backend.config.logging_config import get_logger |
|
|
|
|
|
logger = get_logger("custom_mongo_vector") |
|
|
|
|
|
class VectorSearchOptions(NamedTuple): |
|
|
"""Configuration options for vector search""" |
|
|
index_name: str = "foodInstructionIndex" |
|
|
embedding_key: str = "ingredients_emb" |
|
|
text_key: str = "title" |
|
|
num_candidates: int = 50 |
|
|
similarity_metric: str = "cosine" |
|
|
|
|
|
class CustomMongoDBVectorStore(VectorStore): |
|
|
""" |
|
|
Streamlined MongoDB Atlas Vector Store with efficient $vectorSearch aggregation. |
|
|
Falls back to Python similarity calculation when Atlas Vector Search is unavailable. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
collection: Collection, |
|
|
embedding_function, |
|
|
options: Optional[VectorSearchOptions] = None |
|
|
): |
|
|
self.collection = collection |
|
|
self.embedding_function = embedding_function |
|
|
self.options = options or VectorSearchOptions() |
|
|
|
|
|
logger.info(f"🔧 Streamlined MongoDB Vector Store initialized") |
|
|
logger.info(f"� Config: {self.options.index_name} index, {self.options.similarity_metric} similarity") |
|
|
|
|
|
def _calculate_similarity(self, vec1: List[float], vec2: List[float]) -> float: |
|
|
"""Calculate similarity using the most efficient method""" |
|
|
a, b = np.array(vec1), np.array(vec2) |
|
|
|
|
|
if self.options.similarity_metric == "dotProduct": |
|
|
|
|
|
return float(np.dot(a, b)) |
|
|
else: |
|
|
|
|
|
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))) |
|
|
|
|
|
def similarity_search(self, query: str, k: int = 4, **kwargs: Any) -> List[Document]: |
|
|
"""Streamlined similarity search using Atlas Vector Search with Python fallback""" |
|
|
logger.info(f"🔍 Searching: '{query}' (k={k})") |
|
|
|
|
|
qvec = self.embedding_function.embed_query(query) |
|
|
|
|
|
|
|
|
try: |
|
|
pipeline = [ |
|
|
{ |
|
|
"$vectorSearch": { |
|
|
"index": self.options.index_name, |
|
|
"path": self.options.embedding_key, |
|
|
"queryVector": qvec, |
|
|
"numCandidates": self.options.num_candidates, |
|
|
"limit": k |
|
|
} |
|
|
}, |
|
|
{ |
|
|
"$match": { |
|
|
'$or': [ |
|
|
{ 'needs_review': { '$exists': False } }, |
|
|
{ 'needs_review': False } |
|
|
] |
|
|
} |
|
|
} |
|
|
] |
|
|
|
|
|
results = list(self.collection.aggregate(pipeline)) |
|
|
if results: |
|
|
logger.info(f"✅ Atlas Vector Search: {len(results)} results") |
|
|
return self._create_documents(results) |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"⚠️ Atlas Vector Search failed: {e}") |
|
|
|
|
|
|
|
|
logger.info("🔄 Using Python similarity fallback") |
|
|
return self._python_similarity_search(qvec, k) |
|
|
|
|
|
def _python_similarity_search(self, qvec: List[float], k: int) -> List[Document]: |
|
|
"""Efficient Python-based similarity search fallback""" |
|
|
cursor = self.collection.find( |
|
|
{'$or': [ |
|
|
{'needs_review': {'$exists': False}}, |
|
|
{'needs_review': False} |
|
|
]}, |
|
|
{self.options.text_key: 1, self.options.embedding_key: 1, "ingredients": 1, "instructions": 1} |
|
|
) |
|
|
|
|
|
|
|
|
similarities = [] |
|
|
for doc in cursor: |
|
|
doc_emb = doc.get(self.options.embedding_key) |
|
|
if doc_emb and len(doc_emb) == len(qvec): |
|
|
score = self._calculate_similarity(qvec, doc_emb) |
|
|
similarities.append((doc, score)) |
|
|
|
|
|
|
|
|
similarities.sort(key=lambda x: x[1], reverse=True) |
|
|
top_docs = [doc for doc, _ in similarities[:k]] |
|
|
|
|
|
logger.info(f"📊 Python fallback: {len(similarities)} processed, {len(top_docs)} returned") |
|
|
return self._create_documents(top_docs) |
|
|
|
|
|
def _create_documents(self, docs: List[Dict]) -> List[Document]: |
|
|
"""Create LangChain Documents from MongoDB results using clean string content""" |
|
|
documents = [] |
|
|
for doc in docs: |
|
|
title = doc.get(self.options.text_key, "Untitled Recipe") |
|
|
ingredients = doc.get("ingredients", "") |
|
|
instructions = doc.get("instructions", "") |
|
|
|
|
|
|
|
|
content_parts = [f"Recipe: {title}"] |
|
|
|
|
|
if ingredients: |
|
|
content_parts.append(f"Ingredients: {ingredients}") |
|
|
|
|
|
if instructions: |
|
|
content_parts.append(f"Instructions: {instructions}") |
|
|
|
|
|
content = "\n\n".join(content_parts) |
|
|
|
|
|
documents.append(Document( |
|
|
page_content=content, |
|
|
metadata={"_id": str(doc["_id"]), "title": title} |
|
|
)) |
|
|
|
|
|
return documents |
|
|
|
|
|
def similarity_search_with_score(self, query: str, k: int = 4, **kwargs: Any) -> List[tuple]: |
|
|
"""Return docs with similarity scores (simplified)""" |
|
|
docs = self.similarity_search(query, k, **kwargs) |
|
|
return [(doc, 1.0) for doc in docs] |
|
|
def add_texts(self, texts: List[str], metadatas: Optional[List[dict]] = None, **kwargs: Any) -> List[str]: |
|
|
"""Read-only vector store - adding texts not supported""" |
|
|
raise NotImplementedError("This vector store is read-only for pre-existing embeddings") |
|
|
|
|
|
@classmethod |
|
|
def from_texts(cls, texts: List[str], embedding_function, metadatas: Optional[List[dict]] = None, **kwargs: Any): |
|
|
"""Read-only vector store - creating from texts not supported""" |
|
|
raise NotImplementedError("This vector store is read-only for pre-existing embeddings") |
|
|
|