File size: 9,508 Bytes
bb80caa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
"""
Simple in-memory vector database for HuggingFace deployment
Replaces ChromaDB with O(N) similarity search
"""

import json
import logging
from pathlib import Path
from typing import List, Dict, Optional, Tuple
import numpy as np
from langchain.schema import Document
from langchain_openai import OpenAIEmbeddings

logger = logging.getLogger(__name__)


class SimpleVectorDB:
    """Simple in-memory vector database using numpy for similarity search."""
    
    def __init__(self, config=None):
        """Initialize the vector database."""
        self.config = config or {}
        self.embeddings_model = OpenAIEmbeddings(
            model=self.config.get("rag.embedding_model", "text-embedding-3-small")
        )
        
        # Storage for documents and vectors
        self.documents: List[Dict] = []
        self.vectors: Optional[np.ndarray] = None
        self._available_versions = None
        
        # Load embeddings on initialization
        self._load_embeddings()
        
    def _load_embeddings(self):
        """Load all embedding files into memory."""
        embeddings_dir = Path(__file__).parent.parent / "data" / "embeddings"
        
        if not embeddings_dir.exists():
            logger.warning(f"Embeddings directory not found: {embeddings_dir}")
            return
            
        all_documents = []
        all_vectors = []
        
        # Load each JSON file
        for json_file in sorted(embeddings_dir.glob("*.json")):
            logger.info(f"Loading embeddings from {json_file.name}")
            
            try:
                with open(json_file, 'r') as f:
                    data = json.load(f)
                
                # Extract metadata from filename
                store_name = json_file.stem
                if store_name == "general_faq":
                    product = "general"
                    version = "all"
                else:
                    parts = store_name.split("_", 1)
                    if len(parts) == 2:
                        product = parts[0]
                        version = parts[1].replace("_", ".")
                    else:
                        product = "unknown"
                        version = "unknown"
                
                # Process chunks
                for i, chunk in enumerate(data.get("chunks", [])):
                    doc = {
                        "content": chunk.get("text", ""),
                        "metadata": {
                            "product": product,
                            "version": version,
                            "store_name": store_name,
                            "chunk_index": i,
                            "chunk_id": f"{store_name}_chunk_{i}"
                        }
                    }
                    
                    # Add optional metadata if available
                    if "metadata" in chunk:
                        chunk_meta = chunk["metadata"]
                        doc["metadata"].update({
                            "source": chunk_meta.get("source", ""),
                            "page": chunk_meta.get("page", -1),
                            "document": chunk_meta.get("document", ""),
                            "token_count": chunk_meta.get("token_count", 0)
                        })
                    
                    all_documents.append(doc)
                    all_vectors.append(chunk.get("embedding", []))
                    
            except Exception as e:
                logger.error(f"Error loading {json_file.name}: {e}")
                continue
        
        # Convert to numpy array for efficient computation
        if all_vectors:
            self.documents = all_documents
            self.vectors = np.array(all_vectors, dtype=np.float32)
            logger.info(f"Loaded {len(self.documents)} documents with embeddings")
        else:
            logger.warning("No embeddings loaded")
    
    def _cosine_similarity(self, query_vector: np.ndarray, vectors: np.ndarray) -> np.ndarray:
        """Compute cosine similarity between query vector and all vectors."""
        # Normalize query vector
        query_norm = query_vector / (np.linalg.norm(query_vector) + 1e-10)
        
        # Normalize all vectors
        norms = np.linalg.norm(vectors, axis=1, keepdims=True) + 1e-10
        vectors_norm = vectors / norms
        
        # Compute dot product (cosine similarity)
        similarities = np.dot(vectors_norm, query_norm)
        
        return similarities
    
    def _filter_documents(self, indices: List[int], filter_dict: Optional[Dict] = None) -> List[int]:
        """Filter document indices based on metadata criteria."""
        if not filter_dict:
            return indices
            
        filtered = []
        
        for idx in indices:
            doc = self.documents[idx]
            metadata = doc["metadata"]
            
            # Handle $and operator
            if "$and" in filter_dict:
                all_match = True
                for condition in filter_dict["$and"]:
                    for key, value in condition.items():
                        if metadata.get(key) != value:
                            all_match = False
                            break
                    if not all_match:
                        break
                if all_match:
                    filtered.append(idx)
                    
            # Handle simple key-value filters
            else:
                match = True
                for key, value in filter_dict.items():
                    if isinstance(value, dict) and "$eq" in value:
                        if metadata.get(key) != value["$eq"]:
                            match = False
                            break
                    elif metadata.get(key) != value:
                        match = False
                        break
                if match:
                    filtered.append(idx)
        
        return filtered
    
    def query_with_filter(self, query: str, product: str, version: str, k: int = 5) -> List[Document]:
        """Query with product and version filter."""
        logger.info(f"Querying {product} {version} for: {query}")
        
        filter_dict = {"$and": [{"product": product}, {"version": version}]}
        return self._query(query, k, filter_dict)
    
    def query_product_all_versions(self, query: str, product: str, k: int = 5) -> List[Document]:
        """Query across all versions of a product."""
        logger.info(f"Querying all {product} versions for: {query}")
        
        filter_dict = {"product": {"$eq": product}}
        return self._query(query, k, filter_dict)
    
    def query_all_products(self, query: str, k: int = 5) -> List[Document]:
        """Query across all products and versions."""
        logger.info(f"Querying all products for: {query}")
        return self._query(query, k, None)
    
    def _query(self, query: str, k: int = 5, filter_dict: Optional[Dict] = None) -> List[Document]:
        """Internal query method."""
        if self.vectors is None or len(self.documents) == 0:
            logger.warning("No documents loaded")
            return []
        
        # Get query embedding
        try:
            query_embedding = self.embeddings_model.embed_query(query)
            query_vector = np.array(query_embedding, dtype=np.float32)
        except Exception as e:
            logger.error(f"Error getting query embedding: {e}")
            return []
        
        # Compute similarities
        similarities = self._cosine_similarity(query_vector, self.vectors)
        
        # Get top k indices
        top_indices = np.argsort(similarities)[::-1]  # Sort descending
        
        # Apply filters
        if filter_dict:
            top_indices = self._filter_documents(top_indices.tolist(), filter_dict)
        
        # Take top k after filtering
        top_indices = top_indices[:k]
        
        # Convert to LangChain Document objects
        results = []
        for idx in top_indices:
            doc_data = self.documents[idx]
            doc = Document(
                page_content=doc_data["content"],
                metadata=doc_data["metadata"]
            )
            results.append(doc)
        
        logger.info(f"Found {len(results)} documents")
        return results
    
    def list_available_versions(self) -> Dict[str, List[str]]:
        """List all available product versions."""
        if self._available_versions is not None:
            return self._available_versions
        
        versions_map = {}
        
        for doc in self.documents:
            product = doc["metadata"].get("product", "unknown")
            version = doc["metadata"].get("version", "unknown")
            
            if product not in versions_map:
                versions_map[product] = set()
            versions_map[product].add(version)
        
        # Convert sets to sorted lists
        self._available_versions = {
            product: sorted(list(versions))
            for product, versions in versions_map.items()
        }
        
        return self._available_versions


# Create a singleton instance
_db_instance = None

def get_simple_vector_db(config=None) -> SimpleVectorDB:
    """Get or create the singleton vector database instance."""
    global _db_instance
    if _db_instance is None:
        _db_instance = SimpleVectorDB(config)
    return _db_instance