File size: 6,568 Bytes
c59d808
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
"""
Streamlined MongoDB Vector Store with Atlas Vector Search
"""

from typing import List, Dict, Any, Optional, NamedTuple
import numpy as np
from langchain.schema import Document
from langchain.vectorstores.base import VectorStore
from pymongo.collection import Collection
from backend.config.logging_config import get_logger

logger = get_logger("custom_mongo_vector")

class VectorSearchOptions(NamedTuple):
    """Configuration options for vector search"""
    index_name: str = "foodInstructionIndex"
    embedding_key: str = "ingredients_emb"
    text_key: str = "title"
    num_candidates: int = 50
    similarity_metric: str = "cosine"  # cosine or dotProduct

class CustomMongoDBVectorStore(VectorStore):
    """
    Streamlined MongoDB Atlas Vector Store with efficient $vectorSearch aggregation.
    Falls back to Python similarity calculation when Atlas Vector Search is unavailable.
    """
    
    def __init__(
        self,
        collection: Collection,
        embedding_function,
        options: Optional[VectorSearchOptions] = None
    ):
        self.collection = collection
        self.embedding_function = embedding_function
        self.options = options or VectorSearchOptions()
        
        logger.info(f"🔧 Streamlined MongoDB Vector Store initialized")
        logger.info(f"� Config: {self.options.index_name} index, {self.options.similarity_metric} similarity")
    
    def _calculate_similarity(self, vec1: List[float], vec2: List[float]) -> float:
        """Calculate similarity using the most efficient method"""
        a, b = np.array(vec1), np.array(vec2)
        
        if self.options.similarity_metric == "dotProduct":
            # Dot product (faster, good for normalized embeddings)
            return float(np.dot(a, b))
        else:
            # Cosine similarity (more robust, handles non-normalized embeddings)
            return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
    
    def similarity_search(self, query: str, k: int = 4, **kwargs: Any) -> List[Document]:
        """Streamlined similarity search using Atlas Vector Search with Python fallback"""
        logger.info(f"🔍 Searching: '{query}' (k={k})")
        
        qvec = self.embedding_function.embed_query(query)
        
        # Primary: Try Atlas Vector Search (efficient, server-side)
        try:
            pipeline = [
                {
                    "$vectorSearch": {
                        "index": self.options.index_name,
                        "path": self.options.embedding_key,
                        "queryVector": qvec,
                        "numCandidates": self.options.num_candidates,
                        "limit": k
                    }
                },
                {
                    "$match": {
                        '$or': [
                            { 'needs_review': { '$exists': False } },
                            { 'needs_review': False }
                        ]
                    }
                }
            ]
            
            results = list(self.collection.aggregate(pipeline))
            if results:
                logger.info(f"✅ Atlas Vector Search: {len(results)} results")
                return self._create_documents(results)
                
        except Exception as e:
            logger.warning(f"⚠️ Atlas Vector Search failed: {e}")
        
        # Fallback: Python similarity calculation
        logger.info("🔄 Using Python similarity fallback")
        return self._python_similarity_search(qvec, k)
    
    def _python_similarity_search(self, qvec: List[float], k: int) -> List[Document]:
        """Efficient Python-based similarity search fallback"""
        cursor = self.collection.find(
            {'$or': [
                {'needs_review': {'$exists': False}},
                {'needs_review': False}
            ]},
            {self.options.text_key: 1, self.options.embedding_key: 1, "ingredients": 1, "instructions": 1}
        )
        
        # Vectorized similarity calculation for efficiency
        similarities = []
        for doc in cursor:
            doc_emb = doc.get(self.options.embedding_key)
            if doc_emb and len(doc_emb) == len(qvec):
                score = self._calculate_similarity(qvec, doc_emb)
                similarities.append((doc, score))
        
        # Return top-k results
        similarities.sort(key=lambda x: x[1], reverse=True)
        top_docs = [doc for doc, _ in similarities[:k]]
        
        logger.info(f"📊 Python fallback: {len(similarities)} processed, {len(top_docs)} returned")
        return self._create_documents(top_docs)
    
    def _create_documents(self, docs: List[Dict]) -> List[Document]:
        """Create LangChain Documents from MongoDB results using clean string content"""
        documents = []
        for doc in docs:
            title = doc.get(self.options.text_key, "Untitled Recipe")
            ingredients = doc.get("ingredients", "")
            instructions = doc.get("instructions", "")
            
            # Build clean content without extra formatting
            content_parts = [f"Recipe: {title}"]
            
            if ingredients:
                content_parts.append(f"Ingredients: {ingredients}")
            
            if instructions:
                content_parts.append(f"Instructions: {instructions}")
            
            content = "\n\n".join(content_parts)
            
            documents.append(Document(
                page_content=content,
                metadata={"_id": str(doc["_id"]), "title": title}
            ))
        
        return documents
    
    def similarity_search_with_score(self, query: str, k: int = 4, **kwargs: Any) -> List[tuple]:
        """Return docs with similarity scores (simplified)"""
        docs = self.similarity_search(query, k, **kwargs)
        return [(doc, 1.0) for doc in docs]  # Atlas Vector Search doesn't return raw scores
    def add_texts(self, texts: List[str], metadatas: Optional[List[dict]] = None, **kwargs: Any) -> List[str]:
        """Read-only vector store - adding texts not supported"""
        raise NotImplementedError("This vector store is read-only for pre-existing embeddings")
    
    @classmethod
    def from_texts(cls, texts: List[str], embedding_function, metadatas: Optional[List[dict]] = None, **kwargs: Any):
        """Read-only vector store - creating from texts not supported"""
        raise NotImplementedError("This vector store is read-only for pre-existing embeddings")