RAG-Pipeline-Optimizer / core /vector_store.py
puji4ml's picture
Upload 30 files
2b22a59 verified
"""
Vector Database Storage (ChromaDB + LangChain)
=====================================================================
Stores and retrieves document chunks using ChromaDB
"""
from typing import List, Optional, Dict, Any
from pathlib import Path
import shutil
import sys
import os
# Add parent directory to path for imports when running as script
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# LangChain vector store
from langchain_chroma import Chroma
from langchain_core.documents import Document as LangChainDocument
# Our components
try:
from .embedder import Embedder
from .chunker import TextChunk, Chunker
except ImportError:
# Fallback for direct execution
from embedder import Embedder
from chunker import TextChunk, Chunker
# Config
import os
from dotenv import load_dotenv
load_dotenv()
class VectorStore:
"""ChromaDB vector store for RAG pipeline"""
def __init__(
self,
collection_name: str,
embedder: Embedder,
persist_directory: Optional[str] = None,
):
"""
Initialize vector store
Args:
collection_name: Name of the collection (e.g., "pipeline_a_docs")
embedder: Embedder instance to use
persist_directory: Directory to persist the database
"""
self.collection_name = collection_name
self.embedder = embedder
# Set persist directory
if persist_directory is None:
persist_directory = os.getenv("CHROMA_PERSIST_DIR", "./data/vector_stores")
self.persist_directory = Path(persist_directory) / collection_name
self.persist_directory.mkdir(parents=True, exist_ok=True)
# Initialize ChromaDB with LangChain
self.vectorstore = Chroma(
collection_name=collection_name,
embedding_function=embedder.embeddings, # LangChain embeddings
persist_directory=str(self.persist_directory),
)
print(f"✅ Vector store initialized: {collection_name}")
def add_chunks(self, chunks: List[TextChunk], metadata: Optional[Dict] = None) -> List[str]:
"""
Add text chunks to the vector store
Args:
chunks: List of TextChunk objects
metadata: Optional metadata to add to all chunks
Returns:
List of document IDs
"""
if not chunks:
return []
# Convert TextChunks to LangChain Documents
documents = []
for chunk in chunks:
doc_metadata = {
"chunk_id": chunk.chunk_id,
"token_count": chunk.token_count,
"start_char": chunk.start_char,
"end_char": chunk.end_char,
}
# Add custom metadata
if metadata:
doc_metadata.update(metadata)
# Add chunk metadata
if chunk.metadata:
doc_metadata.update(chunk.metadata)
documents.append(LangChainDocument(
page_content=chunk.text,
metadata=doc_metadata
))
# Add to vector store
ids = self.vectorstore.add_documents(documents)
#print(f" ✅ Added {len(ids)} chunks to vector store")
return ids
def similarity_search(
self,
query: str,
k: int = 5,
filter: Optional[Dict] = None
) -> List[Dict[str, Any]]:
"""
Search for similar documents
Args:
query: Query text
k: Number of results to return
filter: Optional metadata filter
Returns:
List of results with text and metadata
"""
# Perform similarity search
results = self.vectorstore.similarity_search(
query=query,
k=k,
filter=filter
)
# Convert to our format
formatted_results = []
for doc in results:
formatted_results.append({
"text": doc.page_content,
"metadata": doc.metadata,
})
return formatted_results
def similarity_search_with_score(
self,
query: str,
k: int = 5,
filter: Optional[Dict] = None
) -> List[Dict[str, Any]]:
"""
Search for similar documents with similarity scores
Args:
query: Query text
k: Number of results to return
filter: Optional metadata filter
Returns:
List of results with text, metadata, and scores
"""
# Perform similarity search with scores
results = self.vectorstore.similarity_search_with_score(
query=query,
k=k,
filter=filter
)
# Convert to our format
formatted_results = []
for doc, score in results:
formatted_results.append({
"text": doc.page_content,
"metadata": doc.metadata,
"score": float(score),
})
return formatted_results
def get_retriever(self, k: int = 5):
"""
Get a LangChain retriever for this vector store
Args:
k: Number of documents to retrieve
Returns:
LangChain retriever
"""
return self.vectorstore.as_retriever(search_kwargs={"k": k})
def delete_collection(self):
"""Delete the entire collection"""
try:
# Close connection first
del self.vectorstore
# Wait a moment for Windows to release file handles
import time
time.sleep(0.5)
# Now delete
if self.persist_directory.exists():
shutil.rmtree(self.persist_directory)
print(f" ✅ Deleted collection: {self.collection_name}")
except Exception as e:
print(f" ⚠️ Error deleting collection: {e}")
print(f" 💡 Manually delete: {self.persist_directory}")
def count(self) -> int:
"""Get the number of documents in the collection"""
try:
# ChromaDB doesn't have a direct count, so we query
collection = self.vectorstore._collection
return collection.count()
except:
return 0
def get_stats(self) -> Dict[str, Any]:
"""Get statistics about the vector store"""
count = self.count()
return {
"collection_name": self.collection_name,
"document_count": count,
"persist_directory": str(self.persist_directory),
"embedding_model": self.embedder.model_name,
"embedding_dimension": self.embedder.dimension,
}
# ============================================================================
# USAGE EXAMPLE
# ============================================================================
if __name__ == "__main__":
print("🗄️ Vector Store Test (ChromaDB + LangChain)")
print("=" * 80)
# Initialize embedder
print("\n1️⃣ Initializing embedder...")
embedder = Embedder(
provider="sentence-transformers",
model_name="all-MiniLM-L6-v2"
)
print(f" ✅ Embedder ready: {embedder.model_name} ({embedder.dimension}D)")
# Initialize vector store
print("\n2️⃣ Initializing vector store...")
vector_store = VectorStore(
collection_name="test_collection",
embedder=embedder,
persist_directory="./data/vector_stores"
)
# Create sample chunks
print("\n3️⃣ Creating sample chunks...")
try:
from .chunker import Chunker, TextChunk
except ImportError:
from core.chunker import Chunker, TextChunk
sample_text = """
# RAG Pipeline Optimizer
The RAG Pipeline Optimizer helps you evaluate different RAG configurations.
## Key Features
1. Multiple embedding models (Azure OpenAI, local models)
2. Different chunking strategies
3. Parallel pipeline execution
4. Cost tracking and optimization
## How It Works
Upload your documents, and the system tests 6 different pipeline configurations simultaneously.
Each pipeline uses different chunk sizes, embedding models, and retrieval strategies.
## Benefits
- Find the optimal RAG configuration for your use case
- Compare accuracy vs cost tradeoffs
- Production-ready deployment
"""
chunker = Chunker(chunk_size=100, chunk_overlap=20)
chunks = chunker.chunk(sample_text, strategy="recursive")
print(f" ✅ Created {len(chunks)} chunks")
# Add chunks to vector store
print("\n4️⃣ Adding chunks to vector store...")
doc_ids = vector_store.add_chunks(
chunks,
metadata={"source": "test_document", "doc_type": "readme"}
)
print(f" ✅ Added {len(doc_ids)} documents")
# Get stats
print("\n5️⃣ Vector store stats:")
stats = vector_store.get_stats()
for key, value in stats.items():
print(f" {key}: {value}")
# Test similarity search
print("\n6️⃣ Testing similarity search...")
query = "How does the RAG optimizer work?"
results = vector_store.similarity_search(query, k=3)
print(f" Query: '{query}'")
print(f" Found {len(results)} results:\n")
for i, result in enumerate(results, 1):
print(f" Result {i}:")
print(f" Text: {result['text'][:100]}...")
print(f" Metadata: {result['metadata']}")
print()
# Test with scores
print("\n7️⃣ Testing similarity search with scores...")
results_with_scores = vector_store.similarity_search_with_score(query, k=3)
print(f" Query: '{query}'")
print(f" Results with scores:\n")
for i, result in enumerate(results_with_scores, 1):
print(f" Result {i} (score: {result['score']:.4f}):")
print(f" Text: {result['text'][:80]}...")
print()
# Test retriever
print("\n8️⃣ Testing LangChain retriever...")
retriever = vector_store.get_retriever(k=2)
# Use invoke() instead of get_relevant_documents() for newer LangChain versions
try:
retrieved_docs = retriever.invoke(query)
except AttributeError:
# Fallback for older versions
retrieved_docs = retriever.get_relevant_documents(query)
print(f" ✅ Retrieved {len(retrieved_docs)} documents using LangChain retriever")
# Cleanup
print("\n9️⃣ Cleaning up test collection...")
vector_store.delete_collection()
print("\n" + "=" * 80)
print("✅ Vector store test complete!")