v3_ai_assistant / py /backend /simple_vector_db.py
Julian Vanecek
Initial commit: AI Assistant Multi-Agent System for HuggingFace Spaces
bb80caa
"""
Simple in-memory vector database for HuggingFace deployment
Replaces ChromaDB with O(N) similarity search
"""
import json
import logging
from pathlib import Path
from typing import List, Dict, Optional, Tuple
import numpy as np
from langchain.schema import Document
from langchain_openai import OpenAIEmbeddings
logger = logging.getLogger(__name__)
class SimpleVectorDB:
"""Simple in-memory vector database using numpy for similarity search."""
def __init__(self, config=None):
"""Initialize the vector database."""
self.config = config or {}
self.embeddings_model = OpenAIEmbeddings(
model=self.config.get("rag.embedding_model", "text-embedding-3-small")
)
# Storage for documents and vectors
self.documents: List[Dict] = []
self.vectors: Optional[np.ndarray] = None
self._available_versions = None
# Load embeddings on initialization
self._load_embeddings()
def _load_embeddings(self):
"""Load all embedding files into memory."""
embeddings_dir = Path(__file__).parent.parent / "data" / "embeddings"
if not embeddings_dir.exists():
logger.warning(f"Embeddings directory not found: {embeddings_dir}")
return
all_documents = []
all_vectors = []
# Load each JSON file
for json_file in sorted(embeddings_dir.glob("*.json")):
logger.info(f"Loading embeddings from {json_file.name}")
try:
with open(json_file, 'r') as f:
data = json.load(f)
# Extract metadata from filename
store_name = json_file.stem
if store_name == "general_faq":
product = "general"
version = "all"
else:
parts = store_name.split("_", 1)
if len(parts) == 2:
product = parts[0]
version = parts[1].replace("_", ".")
else:
product = "unknown"
version = "unknown"
# Process chunks
for i, chunk in enumerate(data.get("chunks", [])):
doc = {
"content": chunk.get("text", ""),
"metadata": {
"product": product,
"version": version,
"store_name": store_name,
"chunk_index": i,
"chunk_id": f"{store_name}_chunk_{i}"
}
}
# Add optional metadata if available
if "metadata" in chunk:
chunk_meta = chunk["metadata"]
doc["metadata"].update({
"source": chunk_meta.get("source", ""),
"page": chunk_meta.get("page", -1),
"document": chunk_meta.get("document", ""),
"token_count": chunk_meta.get("token_count", 0)
})
all_documents.append(doc)
all_vectors.append(chunk.get("embedding", []))
except Exception as e:
logger.error(f"Error loading {json_file.name}: {e}")
continue
# Convert to numpy array for efficient computation
if all_vectors:
self.documents = all_documents
self.vectors = np.array(all_vectors, dtype=np.float32)
logger.info(f"Loaded {len(self.documents)} documents with embeddings")
else:
logger.warning("No embeddings loaded")
def _cosine_similarity(self, query_vector: np.ndarray, vectors: np.ndarray) -> np.ndarray:
"""Compute cosine similarity between query vector and all vectors."""
# Normalize query vector
query_norm = query_vector / (np.linalg.norm(query_vector) + 1e-10)
# Normalize all vectors
norms = np.linalg.norm(vectors, axis=1, keepdims=True) + 1e-10
vectors_norm = vectors / norms
# Compute dot product (cosine similarity)
similarities = np.dot(vectors_norm, query_norm)
return similarities
def _filter_documents(self, indices: List[int], filter_dict: Optional[Dict] = None) -> List[int]:
"""Filter document indices based on metadata criteria."""
if not filter_dict:
return indices
filtered = []
for idx in indices:
doc = self.documents[idx]
metadata = doc["metadata"]
# Handle $and operator
if "$and" in filter_dict:
all_match = True
for condition in filter_dict["$and"]:
for key, value in condition.items():
if metadata.get(key) != value:
all_match = False
break
if not all_match:
break
if all_match:
filtered.append(idx)
# Handle simple key-value filters
else:
match = True
for key, value in filter_dict.items():
if isinstance(value, dict) and "$eq" in value:
if metadata.get(key) != value["$eq"]:
match = False
break
elif metadata.get(key) != value:
match = False
break
if match:
filtered.append(idx)
return filtered
def query_with_filter(self, query: str, product: str, version: str, k: int = 5) -> List[Document]:
"""Query with product and version filter."""
logger.info(f"Querying {product} {version} for: {query}")
filter_dict = {"$and": [{"product": product}, {"version": version}]}
return self._query(query, k, filter_dict)
def query_product_all_versions(self, query: str, product: str, k: int = 5) -> List[Document]:
"""Query across all versions of a product."""
logger.info(f"Querying all {product} versions for: {query}")
filter_dict = {"product": {"$eq": product}}
return self._query(query, k, filter_dict)
def query_all_products(self, query: str, k: int = 5) -> List[Document]:
"""Query across all products and versions."""
logger.info(f"Querying all products for: {query}")
return self._query(query, k, None)
def _query(self, query: str, k: int = 5, filter_dict: Optional[Dict] = None) -> List[Document]:
"""Internal query method."""
if self.vectors is None or len(self.documents) == 0:
logger.warning("No documents loaded")
return []
# Get query embedding
try:
query_embedding = self.embeddings_model.embed_query(query)
query_vector = np.array(query_embedding, dtype=np.float32)
except Exception as e:
logger.error(f"Error getting query embedding: {e}")
return []
# Compute similarities
similarities = self._cosine_similarity(query_vector, self.vectors)
# Get top k indices
top_indices = np.argsort(similarities)[::-1] # Sort descending
# Apply filters
if filter_dict:
top_indices = self._filter_documents(top_indices.tolist(), filter_dict)
# Take top k after filtering
top_indices = top_indices[:k]
# Convert to LangChain Document objects
results = []
for idx in top_indices:
doc_data = self.documents[idx]
doc = Document(
page_content=doc_data["content"],
metadata=doc_data["metadata"]
)
results.append(doc)
logger.info(f"Found {len(results)} documents")
return results
def list_available_versions(self) -> Dict[str, List[str]]:
"""List all available product versions."""
if self._available_versions is not None:
return self._available_versions
versions_map = {}
for doc in self.documents:
product = doc["metadata"].get("product", "unknown")
version = doc["metadata"].get("version", "unknown")
if product not in versions_map:
versions_map[product] = set()
versions_map[product].add(version)
# Convert sets to sorted lists
self._available_versions = {
product: sorted(list(versions))
for product, versions in versions_map.items()
}
return self._available_versions
# Create a singleton instance
_db_instance = None
def get_simple_vector_db(config=None) -> SimpleVectorDB:
"""Get or create the singleton vector database instance."""
global _db_instance
if _db_instance is None:
_db_instance = SimpleVectorDB(config)
return _db_instance