Spaces:
Sleeping
Sleeping
| """ | |
| Simple in-memory vector database for HuggingFace deployment | |
| Replaces ChromaDB with O(N) similarity search | |
| """ | |
| import json | |
| import logging | |
| from pathlib import Path | |
| from typing import List, Dict, Optional, Tuple | |
| import numpy as np | |
| from langchain.schema import Document | |
| from langchain_openai import OpenAIEmbeddings | |
| logger = logging.getLogger(__name__) | |
| class SimpleVectorDB: | |
| """Simple in-memory vector database using numpy for similarity search.""" | |
| def __init__(self, config=None): | |
| """Initialize the vector database.""" | |
| self.config = config or {} | |
| self.embeddings_model = OpenAIEmbeddings( | |
| model=self.config.get("rag.embedding_model", "text-embedding-3-small") | |
| ) | |
| # Storage for documents and vectors | |
| self.documents: List[Dict] = [] | |
| self.vectors: Optional[np.ndarray] = None | |
| self._available_versions = None | |
| # Load embeddings on initialization | |
| self._load_embeddings() | |
| def _load_embeddings(self): | |
| """Load all embedding files into memory.""" | |
| embeddings_dir = Path(__file__).parent.parent / "data" / "embeddings" | |
| if not embeddings_dir.exists(): | |
| logger.warning(f"Embeddings directory not found: {embeddings_dir}") | |
| return | |
| all_documents = [] | |
| all_vectors = [] | |
| # Load each JSON file | |
| for json_file in sorted(embeddings_dir.glob("*.json")): | |
| logger.info(f"Loading embeddings from {json_file.name}") | |
| try: | |
| with open(json_file, 'r') as f: | |
| data = json.load(f) | |
| # Extract metadata from filename | |
| store_name = json_file.stem | |
| if store_name == "general_faq": | |
| product = "general" | |
| version = "all" | |
| else: | |
| parts = store_name.split("_", 1) | |
| if len(parts) == 2: | |
| product = parts[0] | |
| version = parts[1].replace("_", ".") | |
| else: | |
| product = "unknown" | |
| version = "unknown" | |
| # Process chunks | |
| for i, chunk in enumerate(data.get("chunks", [])): | |
| doc = { | |
| "content": chunk.get("text", ""), | |
| "metadata": { | |
| "product": product, | |
| "version": version, | |
| "store_name": store_name, | |
| "chunk_index": i, | |
| "chunk_id": f"{store_name}_chunk_{i}" | |
| } | |
| } | |
| # Add optional metadata if available | |
| if "metadata" in chunk: | |
| chunk_meta = chunk["metadata"] | |
| doc["metadata"].update({ | |
| "source": chunk_meta.get("source", ""), | |
| "page": chunk_meta.get("page", -1), | |
| "document": chunk_meta.get("document", ""), | |
| "token_count": chunk_meta.get("token_count", 0) | |
| }) | |
| all_documents.append(doc) | |
| all_vectors.append(chunk.get("embedding", [])) | |
| except Exception as e: | |
| logger.error(f"Error loading {json_file.name}: {e}") | |
| continue | |
| # Convert to numpy array for efficient computation | |
| if all_vectors: | |
| self.documents = all_documents | |
| self.vectors = np.array(all_vectors, dtype=np.float32) | |
| logger.info(f"Loaded {len(self.documents)} documents with embeddings") | |
| else: | |
| logger.warning("No embeddings loaded") | |
| def _cosine_similarity(self, query_vector: np.ndarray, vectors: np.ndarray) -> np.ndarray: | |
| """Compute cosine similarity between query vector and all vectors.""" | |
| # Normalize query vector | |
| query_norm = query_vector / (np.linalg.norm(query_vector) + 1e-10) | |
| # Normalize all vectors | |
| norms = np.linalg.norm(vectors, axis=1, keepdims=True) + 1e-10 | |
| vectors_norm = vectors / norms | |
| # Compute dot product (cosine similarity) | |
| similarities = np.dot(vectors_norm, query_norm) | |
| return similarities | |
| def _filter_documents(self, indices: List[int], filter_dict: Optional[Dict] = None) -> List[int]: | |
| """Filter document indices based on metadata criteria.""" | |
| if not filter_dict: | |
| return indices | |
| filtered = [] | |
| for idx in indices: | |
| doc = self.documents[idx] | |
| metadata = doc["metadata"] | |
| # Handle $and operator | |
| if "$and" in filter_dict: | |
| all_match = True | |
| for condition in filter_dict["$and"]: | |
| for key, value in condition.items(): | |
| if metadata.get(key) != value: | |
| all_match = False | |
| break | |
| if not all_match: | |
| break | |
| if all_match: | |
| filtered.append(idx) | |
| # Handle simple key-value filters | |
| else: | |
| match = True | |
| for key, value in filter_dict.items(): | |
| if isinstance(value, dict) and "$eq" in value: | |
| if metadata.get(key) != value["$eq"]: | |
| match = False | |
| break | |
| elif metadata.get(key) != value: | |
| match = False | |
| break | |
| if match: | |
| filtered.append(idx) | |
| return filtered | |
| def query_with_filter(self, query: str, product: str, version: str, k: int = 5) -> List[Document]: | |
| """Query with product and version filter.""" | |
| logger.info(f"Querying {product} {version} for: {query}") | |
| filter_dict = {"$and": [{"product": product}, {"version": version}]} | |
| return self._query(query, k, filter_dict) | |
| def query_product_all_versions(self, query: str, product: str, k: int = 5) -> List[Document]: | |
| """Query across all versions of a product.""" | |
| logger.info(f"Querying all {product} versions for: {query}") | |
| filter_dict = {"product": {"$eq": product}} | |
| return self._query(query, k, filter_dict) | |
| def query_all_products(self, query: str, k: int = 5) -> List[Document]: | |
| """Query across all products and versions.""" | |
| logger.info(f"Querying all products for: {query}") | |
| return self._query(query, k, None) | |
| def _query(self, query: str, k: int = 5, filter_dict: Optional[Dict] = None) -> List[Document]: | |
| """Internal query method.""" | |
| if self.vectors is None or len(self.documents) == 0: | |
| logger.warning("No documents loaded") | |
| return [] | |
| # Get query embedding | |
| try: | |
| query_embedding = self.embeddings_model.embed_query(query) | |
| query_vector = np.array(query_embedding, dtype=np.float32) | |
| except Exception as e: | |
| logger.error(f"Error getting query embedding: {e}") | |
| return [] | |
| # Compute similarities | |
| similarities = self._cosine_similarity(query_vector, self.vectors) | |
| # Get top k indices | |
| top_indices = np.argsort(similarities)[::-1] # Sort descending | |
| # Apply filters | |
| if filter_dict: | |
| top_indices = self._filter_documents(top_indices.tolist(), filter_dict) | |
| # Take top k after filtering | |
| top_indices = top_indices[:k] | |
| # Convert to LangChain Document objects | |
| results = [] | |
| for idx in top_indices: | |
| doc_data = self.documents[idx] | |
| doc = Document( | |
| page_content=doc_data["content"], | |
| metadata=doc_data["metadata"] | |
| ) | |
| results.append(doc) | |
| logger.info(f"Found {len(results)} documents") | |
| return results | |
| def list_available_versions(self) -> Dict[str, List[str]]: | |
| """List all available product versions.""" | |
| if self._available_versions is not None: | |
| return self._available_versions | |
| versions_map = {} | |
| for doc in self.documents: | |
| product = doc["metadata"].get("product", "unknown") | |
| version = doc["metadata"].get("version", "unknown") | |
| if product not in versions_map: | |
| versions_map[product] = set() | |
| versions_map[product].add(version) | |
| # Convert sets to sorted lists | |
| self._available_versions = { | |
| product: sorted(list(versions)) | |
| for product, versions in versions_map.items() | |
| } | |
| return self._available_versions | |
| # Create a singleton instance | |
| _db_instance = None | |
| def get_simple_vector_db(config=None) -> SimpleVectorDB: | |
| """Get or create the singleton vector database instance.""" | |
| global _db_instance | |
| if _db_instance is None: | |
| _db_instance = SimpleVectorDB(config) | |
| return _db_instance |