Spaces:
Sleeping
Sleeping
| """ | |
| Vector Database Storage (ChromaDB + LangChain) | |
| ===================================================================== | |
| Stores and retrieves document chunks using ChromaDB | |
| """ | |
| from typing import List, Optional, Dict, Any | |
| from pathlib import Path | |
| import shutil | |
| import sys | |
| import os | |
| # Add parent directory to path for imports when running as script | |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| # LangChain vector store | |
| from langchain_chroma import Chroma | |
| from langchain_core.documents import Document as LangChainDocument | |
| # Our components | |
| try: | |
| from .embedder import Embedder | |
| from .chunker import TextChunk, Chunker | |
| except ImportError: | |
| # Fallback for direct execution | |
| from embedder import Embedder | |
| from chunker import TextChunk, Chunker | |
| # Config | |
| import os | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| class VectorStore: | |
| """ChromaDB vector store for RAG pipeline""" | |
| def __init__( | |
| self, | |
| collection_name: str, | |
| embedder: Embedder, | |
| persist_directory: Optional[str] = None, | |
| ): | |
| """ | |
| Initialize vector store | |
| Args: | |
| collection_name: Name of the collection (e.g., "pipeline_a_docs") | |
| embedder: Embedder instance to use | |
| persist_directory: Directory to persist the database | |
| """ | |
| self.collection_name = collection_name | |
| self.embedder = embedder | |
| # Set persist directory | |
| if persist_directory is None: | |
| persist_directory = os.getenv("CHROMA_PERSIST_DIR", "./data/vector_stores") | |
| self.persist_directory = Path(persist_directory) / collection_name | |
| self.persist_directory.mkdir(parents=True, exist_ok=True) | |
| # Initialize ChromaDB with LangChain | |
| self.vectorstore = Chroma( | |
| collection_name=collection_name, | |
| embedding_function=embedder.embeddings, # LangChain embeddings | |
| persist_directory=str(self.persist_directory), | |
| ) | |
| print(f"✅ Vector store initialized: {collection_name}") | |
| def add_chunks(self, chunks: List[TextChunk], metadata: Optional[Dict] = None) -> List[str]: | |
| """ | |
| Add text chunks to the vector store | |
| Args: | |
| chunks: List of TextChunk objects | |
| metadata: Optional metadata to add to all chunks | |
| Returns: | |
| List of document IDs | |
| """ | |
| if not chunks: | |
| return [] | |
| # Convert TextChunks to LangChain Documents | |
| documents = [] | |
| for chunk in chunks: | |
| doc_metadata = { | |
| "chunk_id": chunk.chunk_id, | |
| "token_count": chunk.token_count, | |
| "start_char": chunk.start_char, | |
| "end_char": chunk.end_char, | |
| } | |
| # Add custom metadata | |
| if metadata: | |
| doc_metadata.update(metadata) | |
| # Add chunk metadata | |
| if chunk.metadata: | |
| doc_metadata.update(chunk.metadata) | |
| documents.append(LangChainDocument( | |
| page_content=chunk.text, | |
| metadata=doc_metadata | |
| )) | |
| # Add to vector store | |
| ids = self.vectorstore.add_documents(documents) | |
| #print(f" ✅ Added {len(ids)} chunks to vector store") | |
| return ids | |
| def similarity_search( | |
| self, | |
| query: str, | |
| k: int = 5, | |
| filter: Optional[Dict] = None | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Search for similar documents | |
| Args: | |
| query: Query text | |
| k: Number of results to return | |
| filter: Optional metadata filter | |
| Returns: | |
| List of results with text and metadata | |
| """ | |
| # Perform similarity search | |
| results = self.vectorstore.similarity_search( | |
| query=query, | |
| k=k, | |
| filter=filter | |
| ) | |
| # Convert to our format | |
| formatted_results = [] | |
| for doc in results: | |
| formatted_results.append({ | |
| "text": doc.page_content, | |
| "metadata": doc.metadata, | |
| }) | |
| return formatted_results | |
| def similarity_search_with_score( | |
| self, | |
| query: str, | |
| k: int = 5, | |
| filter: Optional[Dict] = None | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Search for similar documents with similarity scores | |
| Args: | |
| query: Query text | |
| k: Number of results to return | |
| filter: Optional metadata filter | |
| Returns: | |
| List of results with text, metadata, and scores | |
| """ | |
| # Perform similarity search with scores | |
| results = self.vectorstore.similarity_search_with_score( | |
| query=query, | |
| k=k, | |
| filter=filter | |
| ) | |
| # Convert to our format | |
| formatted_results = [] | |
| for doc, score in results: | |
| formatted_results.append({ | |
| "text": doc.page_content, | |
| "metadata": doc.metadata, | |
| "score": float(score), | |
| }) | |
| return formatted_results | |
| def get_retriever(self, k: int = 5): | |
| """ | |
| Get a LangChain retriever for this vector store | |
| Args: | |
| k: Number of documents to retrieve | |
| Returns: | |
| LangChain retriever | |
| """ | |
| return self.vectorstore.as_retriever(search_kwargs={"k": k}) | |
| def delete_collection(self): | |
| """Delete the entire collection""" | |
| try: | |
| # Close connection first | |
| del self.vectorstore | |
| # Wait a moment for Windows to release file handles | |
| import time | |
| time.sleep(0.5) | |
| # Now delete | |
| if self.persist_directory.exists(): | |
| shutil.rmtree(self.persist_directory) | |
| print(f" ✅ Deleted collection: {self.collection_name}") | |
| except Exception as e: | |
| print(f" ⚠️ Error deleting collection: {e}") | |
| print(f" 💡 Manually delete: {self.persist_directory}") | |
| def count(self) -> int: | |
| """Get the number of documents in the collection""" | |
| try: | |
| # ChromaDB doesn't have a direct count, so we query | |
| collection = self.vectorstore._collection | |
| return collection.count() | |
| except: | |
| return 0 | |
| def get_stats(self) -> Dict[str, Any]: | |
| """Get statistics about the vector store""" | |
| count = self.count() | |
| return { | |
| "collection_name": self.collection_name, | |
| "document_count": count, | |
| "persist_directory": str(self.persist_directory), | |
| "embedding_model": self.embedder.model_name, | |
| "embedding_dimension": self.embedder.dimension, | |
| } | |
| # ============================================================================ | |
| # USAGE EXAMPLE | |
| # ============================================================================ | |
| if __name__ == "__main__": | |
| print("🗄️ Vector Store Test (ChromaDB + LangChain)") | |
| print("=" * 80) | |
| # Initialize embedder | |
| print("\n1️⃣ Initializing embedder...") | |
| embedder = Embedder( | |
| provider="sentence-transformers", | |
| model_name="all-MiniLM-L6-v2" | |
| ) | |
| print(f" ✅ Embedder ready: {embedder.model_name} ({embedder.dimension}D)") | |
| # Initialize vector store | |
| print("\n2️⃣ Initializing vector store...") | |
| vector_store = VectorStore( | |
| collection_name="test_collection", | |
| embedder=embedder, | |
| persist_directory="./data/vector_stores" | |
| ) | |
| # Create sample chunks | |
| print("\n3️⃣ Creating sample chunks...") | |
| try: | |
| from .chunker import Chunker, TextChunk | |
| except ImportError: | |
| from core.chunker import Chunker, TextChunk | |
| sample_text = """ | |
| # RAG Pipeline Optimizer | |
| The RAG Pipeline Optimizer helps you evaluate different RAG configurations. | |
| ## Key Features | |
| 1. Multiple embedding models (Azure OpenAI, local models) | |
| 2. Different chunking strategies | |
| 3. Parallel pipeline execution | |
| 4. Cost tracking and optimization | |
| ## How It Works | |
| Upload your documents, and the system tests 6 different pipeline configurations simultaneously. | |
| Each pipeline uses different chunk sizes, embedding models, and retrieval strategies. | |
| ## Benefits | |
| - Find the optimal RAG configuration for your use case | |
| - Compare accuracy vs cost tradeoffs | |
| - Production-ready deployment | |
| """ | |
| chunker = Chunker(chunk_size=100, chunk_overlap=20) | |
| chunks = chunker.chunk(sample_text, strategy="recursive") | |
| print(f" ✅ Created {len(chunks)} chunks") | |
| # Add chunks to vector store | |
| print("\n4️⃣ Adding chunks to vector store...") | |
| doc_ids = vector_store.add_chunks( | |
| chunks, | |
| metadata={"source": "test_document", "doc_type": "readme"} | |
| ) | |
| print(f" ✅ Added {len(doc_ids)} documents") | |
| # Get stats | |
| print("\n5️⃣ Vector store stats:") | |
| stats = vector_store.get_stats() | |
| for key, value in stats.items(): | |
| print(f" {key}: {value}") | |
| # Test similarity search | |
| print("\n6️⃣ Testing similarity search...") | |
| query = "How does the RAG optimizer work?" | |
| results = vector_store.similarity_search(query, k=3) | |
| print(f" Query: '{query}'") | |
| print(f" Found {len(results)} results:\n") | |
| for i, result in enumerate(results, 1): | |
| print(f" Result {i}:") | |
| print(f" Text: {result['text'][:100]}...") | |
| print(f" Metadata: {result['metadata']}") | |
| print() | |
| # Test with scores | |
| print("\n7️⃣ Testing similarity search with scores...") | |
| results_with_scores = vector_store.similarity_search_with_score(query, k=3) | |
| print(f" Query: '{query}'") | |
| print(f" Results with scores:\n") | |
| for i, result in enumerate(results_with_scores, 1): | |
| print(f" Result {i} (score: {result['score']:.4f}):") | |
| print(f" Text: {result['text'][:80]}...") | |
| print() | |
| # Test retriever | |
| print("\n8️⃣ Testing LangChain retriever...") | |
| retriever = vector_store.get_retriever(k=2) | |
| # Use invoke() instead of get_relevant_documents() for newer LangChain versions | |
| try: | |
| retrieved_docs = retriever.invoke(query) | |
| except AttributeError: | |
| # Fallback for older versions | |
| retrieved_docs = retriever.get_relevant_documents(query) | |
| print(f" ✅ Retrieved {len(retrieved_docs)} documents using LangChain retriever") | |
| # Cleanup | |
| print("\n9️⃣ Cleaning up test collection...") | |
| vector_store.delete_collection() | |
| print("\n" + "=" * 80) | |
| print("✅ Vector store test complete!") | |