Spaces:
Sleeping
Sleeping
| """ | |
| Vector Store Module | |
| =================== | |
| Purpose: Store embeddings and retrieve similar ones | |
| This module uses Chroma for persistent, efficient vector storage. | |
| Chroma is free, local, and production-ready. | |
| Key Concepts: | |
| • Vector storage: Persistent storage mapping chunk_id → embedding | |
| • Metadata: Source info, text preview, etc. | |
| • Retrieval: Find top-k most similar vectors using cosine similarity | |
| • Persistence: Data survives application restarts | |
| """ | |
| from typing import List, Dict, Any | |
| from dataclasses import dataclass, field | |
| import logging | |
| import chromadb | |
| import os | |
| logger = logging.getLogger(__name__) | |
| class RetrievalResult: | |
| """A single retrieved chunk with metadata.""" | |
| chunk_id: str | |
| text: str | |
| similarity: float | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| class ChromaVectorStore: | |
| """ | |
| Vector store using Chroma (persistent, free, production-ready). | |
| Chroma is a modern vector database that: | |
| • Stores embeddings persistently on disk | |
| • Provides similarity search | |
| • Is completely free and open source | |
| • Works locally (no API calls) | |
| This is the recommended implementation for production RAG systems. | |
| """ | |
| def __init__(self, persist_directory: str = ".chromadb", collection_name: str = "rag"): | |
| """ | |
| Initialize Chroma vector store. | |
| Args: | |
| persist_directory: Where to store vectors on disk | |
| collection_name: Name of the collection (namespace) | |
| Example: | |
| >>> store = ChromaVectorStore(persist_directory="./data/vectors") | |
| """ | |
| self.persist_directory = persist_directory | |
| self.collection_name = collection_name | |
| # Ensure persist directory exists | |
| os.makedirs(persist_directory, exist_ok=True) | |
| try: | |
| # Create persistent client | |
| self.client = chromadb.PersistentClient(path=persist_directory) | |
| # Get or create collection | |
| self.collection = self.client.get_or_create_collection( | |
| name=collection_name, | |
| metadata={"hnsw:space": "cosine"} # Use cosine similarity | |
| ) | |
| logger.info( | |
| f"✓ Initialized Chroma vector store at {persist_directory} " | |
| f"(collection: {collection_name})" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to initialize Chroma: {e}") | |
| raise | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| try: | |
| self.client.persist() | |
| self.client.shutdown() | |
| except Exception: | |
| pass | |
| def add( | |
| self, | |
| chunk_id: str, | |
| text: str, | |
| embedding: List[float], | |
| metadata: Dict[str, Any] = None | |
| ) -> None: | |
| """ | |
| Add a chunk with its embedding to the store. | |
| Args: | |
| chunk_id: Unique identifier for chunk | |
| text: Original text content | |
| embedding: Vector representation (list of floats) | |
| metadata: Optional metadata (source, page number, etc.) | |
| Example: | |
| >>> store.add( | |
| ... "doc1_chunk_0", | |
| ... "Machine learning is AI", | |
| ... [0.1, 0.2, ..., 0.384], | |
| ... metadata={"doc_id": "doc1", "page": 1} | |
| ... ) | |
| """ | |
| try: | |
| self.collection.add( | |
| ids=[chunk_id], | |
| documents=[text], | |
| embeddings=[embedding], | |
| metadatas=[metadata or {}] | |
| ) | |
| logger.debug(f"Added chunk {chunk_id} ({len(text)} chars)") | |
| except Exception as e: | |
| logger.error(f"Failed to add chunk {chunk_id}: {e}") | |
| raise | |
| def retrieve( | |
| self, | |
| query_embedding: List[float], | |
| top_k: int = 5 | |
| ) -> List[RetrievalResult]: | |
| """ | |
| Find most similar chunks to query. | |
| Args: | |
| query_embedding: Query vector | |
| top_k: Number of results to return | |
| Returns: | |
| List of RetrievalResult objects, sorted by similarity (highest first) | |
| Example: | |
| >>> results = store.retrieve(query_embedding, top_k=3) | |
| >>> for r in results: | |
| ... print(f"{r.similarity:.3f} | {r.text[:60]}") | |
| """ | |
| try: | |
| if self.collection.count() == 0: | |
| logger.warning("Vector store is empty") | |
| return [] | |
| # Query Chroma | |
| results = self.collection.query( | |
| query_embeddings=[query_embedding], | |
| n_results=top_k | |
| ) | |
| if not results["ids"] or not results["ids"][0]: | |
| logger.debug("No results found for query") | |
| return [] | |
| # Convert to RetrievalResult objects | |
| retrieval_results = [] | |
| for i, chunk_id in enumerate(results["ids"][0]): | |
| # Chroma returns distances, convert to similarity (1 - distance for cosine) | |
| # Note: Chroma with cosine metric returns distances | |
| distance = results["distances"][0][i] | |
| similarity = 1 - distance # Convert distance to similarity | |
| result = RetrievalResult( | |
| chunk_id=chunk_id, | |
| text=results["documents"][0][i], | |
| similarity=similarity, | |
| metadata=results["metadatas"][0][i] | |
| ) | |
| retrieval_results.append(result) | |
| logger.debug(f"Retrieved {len(retrieval_results)} chunks") | |
| return retrieval_results | |
| except Exception as e: | |
| logger.error(f"Retrieval failed: {e}") | |
| raise | |
| def size(self) -> int: | |
| """Return number of chunks in store.""" | |
| try: | |
| count = self.collection.count() | |
| return count | |
| except Exception as e: | |
| logger.error(f"Failed to get store size: {e}") | |
| return 0 | |
| def delete(self, chunk_id: str) -> bool: | |
| """ | |
| Delete a chunk from the store. | |
| Args: | |
| chunk_id: ID of chunk to delete | |
| Returns: | |
| True if deleted, False if not found | |
| """ | |
| try: | |
| self.collection.delete(ids=[chunk_id]) | |
| logger.debug(f"Deleted chunk {chunk_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to delete chunk {chunk_id}: {e}") | |
| return False | |
| def clear(self) -> None: | |
| """Clear all vectors from store.""" | |
| try: | |
| # Get all IDs and delete them | |
| all_data = self.collection.get() | |
| if all_data["ids"]: | |
| self.collection.delete(ids=all_data["ids"]) | |
| logger.info("Cleared vector store") | |
| except Exception as e: | |
| logger.error(f"Failed to clear store: {e}") | |
| raise | |
| # ============ TESTS ============ | |
| import tempfile | |
| import shutil | |
| import time | |
| def test_chroma_vector_store(): | |
| temp_dir = tempfile.mkdtemp() | |
| store = ChromaVectorStore(persist_directory=temp_dir) | |
| try: | |
| # Add chunks | |
| vec1 = [1.0, 0.0, 0.0] | |
| vec2 = [0.9, 0.1, 0.0] | |
| vec3 = [0.0, 1.0, 0.0] | |
| store.add("chunk1", "Machine learning", vec1, metadata={"source": "test"}) | |
| store.add("chunk2", "Deep learning networks", vec2, metadata={"source": "test"}) | |
| store.add("chunk3", "Cooking recipes", vec3, metadata={"source": "test"}) | |
| # Retrieve | |
| results = store.retrieve(vec1, top_k=2) | |
| assert len(results) == 2 | |
| assert results[0].chunk_id == "chunk1" | |
| print("✓ Chroma test passed!") | |
| finally: | |
| # Cleanup Chroma resources | |
| try: | |
| if hasattr(store, "client"): | |
| store.client.close() | |
| del store.client | |
| del store.collection | |
| except Exception as e: | |
| logger.warning(f"Error closing Chroma client: {e}") | |
| # Give Windows time to release file handles | |
| time.sleep(1.0) | |
| # Retry logic for Windows file deletion | |
| retry_count = 0 | |
| max_retries = 5 | |
| while retry_count < max_retries: | |
| try: | |
| shutil.rmtree(temp_dir) | |
| break | |
| except PermissionError: | |
| retry_count += 1 | |
| if retry_count < max_retries: | |
| time.sleep(0.5) | |
| else: | |
| logger.warning(f"Could not delete temp directory {temp_dir}, skipping") | |
| break | |
| if __name__ == "__main__": | |
| logging.basicConfig(level=logging.INFO) | |
| # Test Chroma | |
| try: | |
| test_chroma_vector_store() | |
| except ImportError: | |
| print("Chroma not installed, skipping test") | |
| # Test SimpleVectorStore | |
| test_simple_vector_store() |