""" Streamlined MongoDB Vector Store with Atlas Vector Search """ from typing import List, Dict, Any, Optional, NamedTuple import numpy as np from langchain.schema import Document from langchain.vectorstores.base import VectorStore from pymongo.collection import Collection from backend.config.logging_config import get_logger logger = get_logger("custom_mongo_vector") class VectorSearchOptions(NamedTuple): """Configuration options for vector search""" index_name: str = "foodInstructionIndex" embedding_key: str = "ingredients_emb" text_key: str = "title" num_candidates: int = 50 similarity_metric: str = "cosine" # cosine or dotProduct class CustomMongoDBVectorStore(VectorStore): """ Streamlined MongoDB Atlas Vector Store with efficient $vectorSearch aggregation. Falls back to Python similarity calculation when Atlas Vector Search is unavailable. """ def __init__( self, collection: Collection, embedding_function, options: Optional[VectorSearchOptions] = None ): self.collection = collection self.embedding_function = embedding_function self.options = options or VectorSearchOptions() logger.info(f"🔧 Streamlined MongoDB Vector Store initialized") logger.info(f"� Config: {self.options.index_name} index, {self.options.similarity_metric} similarity") def _calculate_similarity(self, vec1: List[float], vec2: List[float]) -> float: """Calculate similarity using the most efficient method""" a, b = np.array(vec1), np.array(vec2) if self.options.similarity_metric == "dotProduct": # Dot product (faster, good for normalized embeddings) return float(np.dot(a, b)) else: # Cosine similarity (more robust, handles non-normalized embeddings) return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))) def similarity_search(self, query: str, k: int = 4, **kwargs: Any) -> List[Document]: """Streamlined similarity search using Atlas Vector Search with Python fallback""" logger.info(f"🔍 Searching: '{query}' (k={k})") qvec = self.embedding_function.embed_query(query) # Primary: Try Atlas Vector Search (efficient, server-side) try: pipeline = [ { "$vectorSearch": { "index": self.options.index_name, "path": self.options.embedding_key, "queryVector": qvec, "numCandidates": self.options.num_candidates, "limit": k } }, { "$match": { '$or': [ { 'needs_review': { '$exists': False } }, { 'needs_review': False } ] } } ] results = list(self.collection.aggregate(pipeline)) if results: logger.info(f"✅ Atlas Vector Search: {len(results)} results") return self._create_documents(results) except Exception as e: logger.warning(f"⚠️ Atlas Vector Search failed: {e}") # Fallback: Python similarity calculation logger.info("🔄 Using Python similarity fallback") return self._python_similarity_search(qvec, k) def _python_similarity_search(self, qvec: List[float], k: int) -> List[Document]: """Efficient Python-based similarity search fallback""" cursor = self.collection.find( {'$or': [ {'needs_review': {'$exists': False}}, {'needs_review': False} ]}, {self.options.text_key: 1, self.options.embedding_key: 1, "ingredients": 1, "instructions": 1} ) # Vectorized similarity calculation for efficiency similarities = [] for doc in cursor: doc_emb = doc.get(self.options.embedding_key) if doc_emb and len(doc_emb) == len(qvec): score = self._calculate_similarity(qvec, doc_emb) similarities.append((doc, score)) # Return top-k results similarities.sort(key=lambda x: x[1], reverse=True) top_docs = [doc for doc, _ in similarities[:k]] logger.info(f"📊 Python fallback: {len(similarities)} processed, {len(top_docs)} returned") return self._create_documents(top_docs) def _create_documents(self, docs: List[Dict]) -> List[Document]: """Create LangChain Documents from MongoDB results using clean string content""" documents = [] for doc in docs: title = doc.get(self.options.text_key, "Untitled Recipe") ingredients = doc.get("ingredients", "") instructions = doc.get("instructions", "") # Build clean content without extra formatting content_parts = [f"Recipe: {title}"] if ingredients: content_parts.append(f"Ingredients: {ingredients}") if instructions: content_parts.append(f"Instructions: {instructions}") content = "\n\n".join(content_parts) documents.append(Document( page_content=content, metadata={"_id": str(doc["_id"]), "title": title} )) return documents def similarity_search_with_score(self, query: str, k: int = 4, **kwargs: Any) -> List[tuple]: """Return docs with similarity scores (simplified)""" docs = self.similarity_search(query, k, **kwargs) return [(doc, 1.0) for doc in docs] # Atlas Vector Search doesn't return raw scores def add_texts(self, texts: List[str], metadatas: Optional[List[dict]] = None, **kwargs: Any) -> List[str]: """Read-only vector store - adding texts not supported""" raise NotImplementedError("This vector store is read-only for pre-existing embeddings") @classmethod def from_texts(cls, texts: List[str], embedding_function, metadatas: Optional[List[dict]] = None, **kwargs: Any): """Read-only vector store - creating from texts not supported""" raise NotImplementedError("This vector store is read-only for pre-existing embeddings")