""" Simple in-memory vector database for HuggingFace deployment Replaces ChromaDB with O(N) similarity search """ import json import logging from pathlib import Path from typing import List, Dict, Optional, Tuple import numpy as np from langchain.schema import Document from langchain_openai import OpenAIEmbeddings logger = logging.getLogger(__name__) class SimpleVectorDB: """Simple in-memory vector database using numpy for similarity search.""" def __init__(self, config=None): """Initialize the vector database.""" self.config = config or {} self.embeddings_model = OpenAIEmbeddings( model=self.config.get("rag.embedding_model", "text-embedding-3-small") ) # Storage for documents and vectors self.documents: List[Dict] = [] self.vectors: Optional[np.ndarray] = None self._available_versions = None # Load embeddings on initialization self._load_embeddings() def _load_embeddings(self): """Load all embedding files into memory.""" embeddings_dir = Path(__file__).parent.parent / "data" / "embeddings" if not embeddings_dir.exists(): logger.warning(f"Embeddings directory not found: {embeddings_dir}") return all_documents = [] all_vectors = [] # Load each JSON file for json_file in sorted(embeddings_dir.glob("*.json")): logger.info(f"Loading embeddings from {json_file.name}") try: with open(json_file, 'r') as f: data = json.load(f) # Extract metadata from filename store_name = json_file.stem if store_name == "general_faq": product = "general" version = "all" else: parts = store_name.split("_", 1) if len(parts) == 2: product = parts[0] version = parts[1].replace("_", ".") else: product = "unknown" version = "unknown" # Process chunks for i, chunk in enumerate(data.get("chunks", [])): doc = { "content": chunk.get("text", ""), "metadata": { "product": product, "version": version, "store_name": store_name, "chunk_index": i, "chunk_id": f"{store_name}_chunk_{i}" } } # Add optional metadata if available if "metadata" in chunk: chunk_meta = chunk["metadata"] doc["metadata"].update({ "source": chunk_meta.get("source", ""), "page": chunk_meta.get("page", -1), "document": chunk_meta.get("document", ""), "token_count": chunk_meta.get("token_count", 0) }) all_documents.append(doc) all_vectors.append(chunk.get("embedding", [])) except Exception as e: logger.error(f"Error loading {json_file.name}: {e}") continue # Convert to numpy array for efficient computation if all_vectors: self.documents = all_documents self.vectors = np.array(all_vectors, dtype=np.float32) logger.info(f"Loaded {len(self.documents)} documents with embeddings") else: logger.warning("No embeddings loaded") def _cosine_similarity(self, query_vector: np.ndarray, vectors: np.ndarray) -> np.ndarray: """Compute cosine similarity between query vector and all vectors.""" # Normalize query vector query_norm = query_vector / (np.linalg.norm(query_vector) + 1e-10) # Normalize all vectors norms = np.linalg.norm(vectors, axis=1, keepdims=True) + 1e-10 vectors_norm = vectors / norms # Compute dot product (cosine similarity) similarities = np.dot(vectors_norm, query_norm) return similarities def _filter_documents(self, indices: List[int], filter_dict: Optional[Dict] = None) -> List[int]: """Filter document indices based on metadata criteria.""" if not filter_dict: return indices filtered = [] for idx in indices: doc = self.documents[idx] metadata = doc["metadata"] # Handle $and operator if "$and" in filter_dict: all_match = True for condition in filter_dict["$and"]: for key, value in condition.items(): if metadata.get(key) != value: all_match = False break if not all_match: break if all_match: filtered.append(idx) # Handle simple key-value filters else: match = True for key, value in filter_dict.items(): if isinstance(value, dict) and "$eq" in value: if metadata.get(key) != value["$eq"]: match = False break elif metadata.get(key) != value: match = False break if match: filtered.append(idx) return filtered def query_with_filter(self, query: str, product: str, version: str, k: int = 5) -> List[Document]: """Query with product and version filter.""" logger.info(f"Querying {product} {version} for: {query}") filter_dict = {"$and": [{"product": product}, {"version": version}]} return self._query(query, k, filter_dict) def query_product_all_versions(self, query: str, product: str, k: int = 5) -> List[Document]: """Query across all versions of a product.""" logger.info(f"Querying all {product} versions for: {query}") filter_dict = {"product": {"$eq": product}} return self._query(query, k, filter_dict) def query_all_products(self, query: str, k: int = 5) -> List[Document]: """Query across all products and versions.""" logger.info(f"Querying all products for: {query}") return self._query(query, k, None) def _query(self, query: str, k: int = 5, filter_dict: Optional[Dict] = None) -> List[Document]: """Internal query method.""" if self.vectors is None or len(self.documents) == 0: logger.warning("No documents loaded") return [] # Get query embedding try: query_embedding = self.embeddings_model.embed_query(query) query_vector = np.array(query_embedding, dtype=np.float32) except Exception as e: logger.error(f"Error getting query embedding: {e}") return [] # Compute similarities similarities = self._cosine_similarity(query_vector, self.vectors) # Get top k indices top_indices = np.argsort(similarities)[::-1] # Sort descending # Apply filters if filter_dict: top_indices = self._filter_documents(top_indices.tolist(), filter_dict) # Take top k after filtering top_indices = top_indices[:k] # Convert to LangChain Document objects results = [] for idx in top_indices: doc_data = self.documents[idx] doc = Document( page_content=doc_data["content"], metadata=doc_data["metadata"] ) results.append(doc) logger.info(f"Found {len(results)} documents") return results def list_available_versions(self) -> Dict[str, List[str]]: """List all available product versions.""" if self._available_versions is not None: return self._available_versions versions_map = {} for doc in self.documents: product = doc["metadata"].get("product", "unknown") version = doc["metadata"].get("version", "unknown") if product not in versions_map: versions_map[product] = set() versions_map[product].add(version) # Convert sets to sorted lists self._available_versions = { product: sorted(list(versions)) for product, versions in versions_map.items() } return self._available_versions # Create a singleton instance _db_instance = None def get_simple_vector_db(config=None) -> SimpleVectorDB: """Get or create the singleton vector database instance.""" global _db_instance if _db_instance is None: _db_instance = SimpleVectorDB(config) return _db_instance