""" Vector Database Storage (ChromaDB + LangChain) ===================================================================== Stores and retrieves document chunks using ChromaDB """ from typing import List, Optional, Dict, Any from pathlib import Path import shutil import sys import os # Add parent directory to path for imports when running as script sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # LangChain vector store from langchain_chroma import Chroma from langchain_core.documents import Document as LangChainDocument # Our components try: from .embedder import Embedder from .chunker import TextChunk, Chunker except ImportError: # Fallback for direct execution from embedder import Embedder from chunker import TextChunk, Chunker # Config import os from dotenv import load_dotenv load_dotenv() class VectorStore: """ChromaDB vector store for RAG pipeline""" def __init__( self, collection_name: str, embedder: Embedder, persist_directory: Optional[str] = None, ): """ Initialize vector store Args: collection_name: Name of the collection (e.g., "pipeline_a_docs") embedder: Embedder instance to use persist_directory: Directory to persist the database """ self.collection_name = collection_name self.embedder = embedder # Set persist directory if persist_directory is None: persist_directory = os.getenv("CHROMA_PERSIST_DIR", "./data/vector_stores") self.persist_directory = Path(persist_directory) / collection_name self.persist_directory.mkdir(parents=True, exist_ok=True) # Initialize ChromaDB with LangChain self.vectorstore = Chroma( collection_name=collection_name, embedding_function=embedder.embeddings, # LangChain embeddings persist_directory=str(self.persist_directory), ) print(f"✅ Vector store initialized: {collection_name}") def add_chunks(self, chunks: List[TextChunk], metadata: Optional[Dict] = None) -> List[str]: """ Add text chunks to the vector store Args: chunks: List of TextChunk objects metadata: Optional metadata to add to all chunks Returns: List of document IDs """ if not chunks: return [] # Convert TextChunks to LangChain Documents documents = [] for chunk in chunks: doc_metadata = { "chunk_id": chunk.chunk_id, "token_count": chunk.token_count, "start_char": chunk.start_char, "end_char": chunk.end_char, } # Add custom metadata if metadata: doc_metadata.update(metadata) # Add chunk metadata if chunk.metadata: doc_metadata.update(chunk.metadata) documents.append(LangChainDocument( page_content=chunk.text, metadata=doc_metadata )) # Add to vector store ids = self.vectorstore.add_documents(documents) #print(f" ✅ Added {len(ids)} chunks to vector store") return ids def similarity_search( self, query: str, k: int = 5, filter: Optional[Dict] = None ) -> List[Dict[str, Any]]: """ Search for similar documents Args: query: Query text k: Number of results to return filter: Optional metadata filter Returns: List of results with text and metadata """ # Perform similarity search results = self.vectorstore.similarity_search( query=query, k=k, filter=filter ) # Convert to our format formatted_results = [] for doc in results: formatted_results.append({ "text": doc.page_content, "metadata": doc.metadata, }) return formatted_results def similarity_search_with_score( self, query: str, k: int = 5, filter: Optional[Dict] = None ) -> List[Dict[str, Any]]: """ Search for similar documents with similarity scores Args: query: Query text k: Number of results to return filter: Optional metadata filter Returns: List of results with text, metadata, and scores """ # Perform similarity search with scores results = self.vectorstore.similarity_search_with_score( query=query, k=k, filter=filter ) # Convert to our format formatted_results = [] for doc, score in results: formatted_results.append({ "text": doc.page_content, "metadata": doc.metadata, "score": float(score), }) return formatted_results def get_retriever(self, k: int = 5): """ Get a LangChain retriever for this vector store Args: k: Number of documents to retrieve Returns: LangChain retriever """ return self.vectorstore.as_retriever(search_kwargs={"k": k}) def delete_collection(self): """Delete the entire collection""" try: # Close connection first del self.vectorstore # Wait a moment for Windows to release file handles import time time.sleep(0.5) # Now delete if self.persist_directory.exists(): shutil.rmtree(self.persist_directory) print(f" ✅ Deleted collection: {self.collection_name}") except Exception as e: print(f" ⚠️ Error deleting collection: {e}") print(f" 💡 Manually delete: {self.persist_directory}") def count(self) -> int: """Get the number of documents in the collection""" try: # ChromaDB doesn't have a direct count, so we query collection = self.vectorstore._collection return collection.count() except: return 0 def get_stats(self) -> Dict[str, Any]: """Get statistics about the vector store""" count = self.count() return { "collection_name": self.collection_name, "document_count": count, "persist_directory": str(self.persist_directory), "embedding_model": self.embedder.model_name, "embedding_dimension": self.embedder.dimension, } # ============================================================================ # USAGE EXAMPLE # ============================================================================ if __name__ == "__main__": print("🗄️ Vector Store Test (ChromaDB + LangChain)") print("=" * 80) # Initialize embedder print("\n1️⃣ Initializing embedder...") embedder = Embedder( provider="sentence-transformers", model_name="all-MiniLM-L6-v2" ) print(f" ✅ Embedder ready: {embedder.model_name} ({embedder.dimension}D)") # Initialize vector store print("\n2️⃣ Initializing vector store...") vector_store = VectorStore( collection_name="test_collection", embedder=embedder, persist_directory="./data/vector_stores" ) # Create sample chunks print("\n3️⃣ Creating sample chunks...") try: from .chunker import Chunker, TextChunk except ImportError: from core.chunker import Chunker, TextChunk sample_text = """ # RAG Pipeline Optimizer The RAG Pipeline Optimizer helps you evaluate different RAG configurations. ## Key Features 1. Multiple embedding models (Azure OpenAI, local models) 2. Different chunking strategies 3. Parallel pipeline execution 4. Cost tracking and optimization ## How It Works Upload your documents, and the system tests 6 different pipeline configurations simultaneously. Each pipeline uses different chunk sizes, embedding models, and retrieval strategies. ## Benefits - Find the optimal RAG configuration for your use case - Compare accuracy vs cost tradeoffs - Production-ready deployment """ chunker = Chunker(chunk_size=100, chunk_overlap=20) chunks = chunker.chunk(sample_text, strategy="recursive") print(f" ✅ Created {len(chunks)} chunks") # Add chunks to vector store print("\n4️⃣ Adding chunks to vector store...") doc_ids = vector_store.add_chunks( chunks, metadata={"source": "test_document", "doc_type": "readme"} ) print(f" ✅ Added {len(doc_ids)} documents") # Get stats print("\n5️⃣ Vector store stats:") stats = vector_store.get_stats() for key, value in stats.items(): print(f" {key}: {value}") # Test similarity search print("\n6️⃣ Testing similarity search...") query = "How does the RAG optimizer work?" results = vector_store.similarity_search(query, k=3) print(f" Query: '{query}'") print(f" Found {len(results)} results:\n") for i, result in enumerate(results, 1): print(f" Result {i}:") print(f" Text: {result['text'][:100]}...") print(f" Metadata: {result['metadata']}") print() # Test with scores print("\n7️⃣ Testing similarity search with scores...") results_with_scores = vector_store.similarity_search_with_score(query, k=3) print(f" Query: '{query}'") print(f" Results with scores:\n") for i, result in enumerate(results_with_scores, 1): print(f" Result {i} (score: {result['score']:.4f}):") print(f" Text: {result['text'][:80]}...") print() # Test retriever print("\n8️⃣ Testing LangChain retriever...") retriever = vector_store.get_retriever(k=2) # Use invoke() instead of get_relevant_documents() for newer LangChain versions try: retrieved_docs = retriever.invoke(query) except AttributeError: # Fallback for older versions retrieved_docs = retriever.get_relevant_documents(query) print(f" ✅ Retrieved {len(retrieved_docs)} documents using LangChain retriever") # Cleanup print("\n9️⃣ Cleaning up test collection...") vector_store.delete_collection() print("\n" + "=" * 80) print("✅ Vector store test complete!")