Spaces:
Running
Running
RAG Capstone Project - Code Architecture & Walkthrough
Table of Contents
- Project Structure
- Core Components
- Data Flow
- Detailed Code Walkthroughs
- Key Classes & Methods
- Configuration System
Project Structure
RAG Capstone Project/
βββ streamlit_app.py # Main UI application
βββ api.py # FastAPI backend (optional)
βββ llm_client.py # Groq LLM integration
βββ vector_store.py # ChromaDB management
βββ dataset_loader.py # RAGBench dataset loading
βββ embedding_models.py # Embedding model factory
βββ chunking_strategies.py # Document chunking
βββ trace_evaluator.py # Evaluation metrics
βββ config.py # Configuration settings
βββ requirements.txt # Dependencies
βββ chroma_db/ # Persistent vector store
Core Components
1. Configuration (config.py)
class Settings(BaseSettings):
"""Central configuration management using Pydantic."""
# API Configuration
groq_api_key: str = "" # Groq API key
groq_rpm_limit: int = 30 # Requests per minute
rate_limit_delay: float = 2.0 # Delay between requests
# Storage
chroma_persist_directory: str = "./chroma_db"
# Available Models
embedding_models: list = [ # 8 embedding options
"sentence-transformers/all-mpnet-base-v2",
"emilyalsentzer/Bio_ClinicalBERT",
"microsoft/BiomedNLP-PubMedBERT-base-uncased-abstract",
"sentence-transformers/all-MiniLM-L6-v2",
"sentence-transformers/multilingual-MiniLM-L12-v2",
"sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2",
"allenai/specter",
"gemini-embedding-001"
]
llm_models: list = [ # 3 LLM options
"meta-llama/llama-4-maverick-17b-128e-instruct",
"llama-3.1-8b-instant",
"openai/gpt-oss-120b"
]
chunking_strategies: list = [ # 4 chunking strategies
"dense", "sparse", "hybrid", "re-ranking"
]
ragbench_datasets: list = [ # 12 RAGBench datasets
"covidqa", "cuad", "delucionqa", "emanual",
"expertqa", "finqa", "hagrid", "hotpotqa",
"msmarco", "pubmedqa", "tatqa", "techqa"
]
Key Features:
- β Pydantic validation
- β
Loads from
.envfile - β Centralized settings management
- β Easy to extend
2. LLM Client (llm_client.py)
A. Rate Limiter Class
class RateLimiter:
"""Prevents API rate limit violations."""
def __init__(self, max_requests_per_minute: int = 30):
self.max_requests = 30 # 30 requests/minute for Groq
self.request_times = deque() # Track request timestamps
def acquire_sync(self):
"""
Synchronous rate limiting:
Flow:
1. Remove requests older than 1 minute
2. If at limit: calculate wait time
3. Sleep for wait time
4. Record this request
Example:
- At 00:00 make 30 requests
- At 00:05 try 31st request
- Wait time = 60 - 5 = 55 seconds
"""
Why needed?
- Groq API has 30 requests/minute limit
- Prevents rate limit errors
- Handles multiple concurrent requests gracefully
B. GroqLLMClient Class
class GroqLLMClient:
"""Main LLM interface using Groq API."""
def __init__(self, api_key: str, model_name: str, max_rpm: int = 30):
self.client = Groq(api_key=api_key) # Groq API client
self.model_name = model_name # Selected model
self.rate_limiter = RateLimiter(max_rpm) # Rate limiting
def generate(self, prompt: str, max_tokens: int = 1024) -> str:
"""
Generate text from prompt:
Execution Flow:
1. rate_limiter.acquire_sync() # Wait if needed
2. self.client.chat.completions.create() # Call Groq API
3. time.sleep(rate_limit_delay) # Additional delay
4. Return response.choices[0].message.content
Code:
"""
self.rate_limiter.acquire_sync()
messages = [{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt}]
response = self.client.chat.completions.create(
model=self.model_name,
messages=messages,
max_tokens=max_tokens,
temperature=0.7
)
time.sleep(self.rate_limit_delay)
return response.choices[0].message.content
C. RAGPipeline Class
class RAGPipeline:
"""Orchestrates the complete RAG workflow."""
def __init__(self, llm_client: GroqLLMClient,
vector_store_manager: ChromaDBManager):
self.llm_client = llm_client
self.vector_store = vector_store_manager
self.chat_history = []
def query(self, query: str, n_results: int = 5) -> Dict:
"""
Execute RAG Query:
Step 1: RETRIEVAL
βββββββββββββββββ
retrieved_docs = vector_store.get_retrieved_documents(query, n_results=5)
Step 2: CONTEXT AUGMENTATION
ββββββββββββββββββββββββββββ
doc_texts = [doc["document"] for doc in retrieved_docs]
Step 3: GENERATION
ββββββββββββββββββ
response = llm.generate_with_context(query, doc_texts)
Step 4: HISTORY
βββββββββββββββ
chat_history.append({"query": query, "response": response})
Return: {
"query": "What is AI?",
"response": "Generated answer...",
"retrieved_documents": [
{
"document": "AI is...",
"distance": 0.123,
"metadata": {...}
},
...
]
}
"""
3. Embedding Models (embedding_models.py)
Model Types
class EmbeddingModel:
"""Base class for all embedding models."""
def embed_documents(self, texts: List[str]) -> np.ndarray:
"""Convert texts to vectors (embeddings)."""
raise NotImplementedError
def embed_query(self, query: str) -> np.ndarray:
"""Convert query to vector."""
return self.embed_documents([query])[0]
class SentenceTransformerEmbedding(EmbeddingModel):
"""Uses pre-trained transformer models from HuggingFace."""
def load_model(self):
"""
Load SentenceTransformer:
What it does:
1. Downloads model from HuggingFace
2. Loads to GPU (if available) or CPU
3. Sets to eval mode (no dropout)
Example:
model = SentenceTransformer("all-mpnet-base-v2")
"""
self.model = SentenceTransformer(self.model_name, device=self.device)
def embed_documents(self, texts: List[str], batch_size: int = 32) -> np.ndarray:
"""
Batch embed documents:
Process:
1. Split texts into batches (32 texts at a time)
2. For each batch: self.model.encode(batch)
3. Stack all embeddings
4. Return numpy array
Efficiency: Batching prevents memory overflow
"""
embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i+batch_size]
embeddings.append(self.model.encode(batch, convert_to_numpy=True))
return np.vstack(embeddings)
Factory Pattern
class EmbeddingFactory:
"""Creates appropriate embedding model."""
@staticmethod
def create_embedding_model(model_name: str) -> EmbeddingModel:
"""
Automatically select model type:
Logic:
- If "Bio" or "Biomed" in name β BioMedicalEmbedding
- If "specter" β SentenceTransformerEmbedding
- Otherwise β SentenceTransformerEmbedding (default)
Usage:
model = EmbeddingFactory.create_embedding_model("all-mpnet-base-v2")
embeddings = model.embed_documents(["text1", "text2"])
"""
4. Vector Store (vector_store.py)
ChromaDBManager Class
class ChromaDBManager:
"""Manages ChromaDB vector database."""
def __init__(self, persist_directory: str = "./chroma_db"):
"""
Initialize persistent vector database:
Key: PersistentClient ensures data survives app restarts
Features:
1. Create/use directory: ./chroma_db
2. Initialize PersistentClient (not ephemeral)
3. Enable telemetry anonymization
4. Fallback to regular Client if needed
"""
self.client = chromadb.PersistentClient(
path=persist_directory,
settings=Settings(anonymized_telemetry=False)
)
self.current_collection = None
self.embedding_model = None
def create_collection(self, collection_name: str,
embedding_model_name: str) -> chromadb.Collection:
"""
Create new vector collection:
Process:
1. Delete if exists (avoid conflicts)
2. Load embedding model
3. Create ChromaDB collection with metadata
4. Store reference in self.current_collection
Collection Structure:
{
"name": "wiki_qa_dense_all_mpnet",
"metadata": {
"embedding_model": "all-mpnet-base-v2",
"hnsw:space": "cosine" # Similarity metric
},
"documents": [...], # Document texts
"embeddings": [...], # Vector embeddings
"metadatas": [...] # Document metadata
}
"""
def add_documents(self, documents: List[str],
metadatas: Optional[List[Dict]] = None):
"""
Add documents to collection:
Steps:
1. Generate IDs if not provided: uuid.uuid4()
2. Generate default metadata if not provided
3. Process in batches (prevents memory issues)
4. Embed each batch: self.embedding_model.embed_documents()
5. Add to collection: self.current_collection.add()
Code Flow:
for batch in batches(documents, batch_size=100):
embeddings = self.embedding_model.embed_documents(batch)
self.current_collection.add(
ids=ids,
embeddings=embeddings,
documents=batch,
metadatas=metadatas
)
"""
def get_retrieved_documents(self, query: str,
n_results: int = 5) -> List[Dict]:
"""
Retrieve similar documents:
Retrieval Process (using HNSW):
1. Embed query: embedding = embed_model.embed_query(query)
2. Query collection: results = collection.query(
query_embeddings=[embedding],
n_results=5,
include=["documents", "metadatas", "distances"]
)
3. Format results and return
Return Format:
[
{
"document": "Document text...",
"distance": 0.123, # Lower = more similar
"metadata": {...}
},
...
]
"""
5. Dataset Loader (dataset_loader.py)
class RAGBenchLoader:
"""Loads datasets from Hugging Face rungalileo/ragbench."""
SUPPORTED_DATASETS = [
'covidqa', 'cuad', 'delucionqa', 'emanual',
'expertqa', 'finqa', 'hagrid', 'hotpotqa',
'msmarco', 'pubmedqa', 'tatqa', 'techqa'
]
def load_dataset(self, dataset_name: str, split: str = "train",
max_samples: Optional[int] = None) -> List[Dict]:
"""
Load RAGBench dataset:
Process:
1. Validate dataset name
2. Load from HuggingFace: load_dataset("rungalileo/ragbench", dataset_name)
3. Select max_samples if specified
4. Process each item: _process_ragbench_item()
5. Return list of standardized dicts
Result Format:
[
{
"question": "What is X?",
"answer": "X is...",
"documents": ["doc1", "doc2", ...],
"context": "combined document text",
"dataset": "wiki_qa"
},
...
]
Caching:
- First load downloads ~100MB per dataset
- Subsequent loads use cache
- Cache location: ./data_cache/
"""
def get_test_data_size(self, dataset_name: str) -> int:
"""
Get available test samples without loading full dataset:
Efficient Approach:
1. builder = load_dataset_builder("rungalileo/ragbench", dataset_name)
2. Check splits: builder.info.splits
3. Return: builder.info.splits['test'].num_examples
Benefit: Fast metadata access (~1 second)
vs. Full load (~30 seconds)
Fallback:
- If builder fails: load_dataset() and return len(ds)
- If error: return 100 (reasonable default)
"""
Data Flow
Complete RAG Query Flow
User Types Question in Chat
β
streamlit_app.py receives input
β
RAGPipeline.query(question)
ββ STEP 1: RETRIEVAL
β ββ embedding_model.embed_query(question)
β β ββ "What is AI?" β [0.1, 0.2, 0.3, ...]
β β
β ββ vector_store.get_retrieved_documents(query_embedding, n_results=5)
β ββ Search in ChromaDB collection
β ββ Return top 5 similar documents
β
ββ STEP 2: CONTEXT PREPARATION
β ββ Extract text from retrieved documents
β
ββ STEP 3: GENERATION
β ββ llm_client.generate_with_context(question, doc_texts)
β ββ Rate limiter checks (wait if needed)
β ββ Send to Groq API:
β β "Use context to answer: [docs...] Question: [q...]"
β ββ Return generated response
β
ββ STEP 4: RETURN RESULT
ββ {"query": q, "response": r, "retrieved_documents": docs}
β
Display in Streamlit UI
Collection Creation Flow
User configures in sidebar:
ββ Dataset: wiki_qa
ββ Embedding: all-mpnet-base-v2
ββ Chunking: dense
ββ LLM: llama-3.1-8b
β
Click "Load Data & Create Collection"
β
dataset_loader.load_dataset("wiki_qa", split="train", max_samples=100)
ββ Downloads dataset from HuggingFace
ββ Processes 100 samples
ββ Returns list of {"question", "answer", "documents", ...}
β
chunking_strategy.chunk(documents, chunk_size=512, overlap=50)
ββ Split large docs into 512-token chunks
ββ Maintain 50-token overlap for context
ββ Returns list of chunks
β
vector_store.load_dataset_into_collection()
ββ Create collection: "wiki_qa_dense_all_mpnet"
ββ For each chunk:
β ββ embedding_model.embed(chunk)
β ββ Generate UUID
β ββ Store in ChromaDB
ββ Persist to ./chroma_db/ on disk
β
Store references in Streamlit session state
β
Ready for chat & evaluation!
Detailed Code Walkthroughs
Walkthrough 1: User Chats with RAG System
# File: streamlit_app.py - chat_interface()
# Step 1: User types query
query = st.chat_input("Ask a question...")
if query:
# Step 2: Display user message
with st.chat_message("user"):
st.write(query)
# Step 3: Call RAG pipeline
result = st.session_state.rag_pipeline.query(
query,
n_results=5 # Retrieve top 5 docs
)
# Inside RAGPipeline.query():
# - retrieved_docs = vector_store.get_retrieved_documents()
# - doc_texts = extract texts
# - response = llm.generate_with_context(query, doc_texts)
# - Store in chat_history
# - Return {"query", "response", "retrieved_documents"}
# Step 4: Display response
with st.chat_message("assistant"):
st.write(result["response"])
# Step 5: Show retrieved documents
with st.expander("π Retrieved Documents"):
for i, doc in enumerate(result["retrieved_documents"]):
st.markdown(f"**Doc {i+1}** - Distance: {doc['distance']:.4f}")
st.text_area("", value=doc["document"], height=100)
# Step 6: Store in session history
st.session_state.chat_history.append(result)
st.rerun()
Walkthrough 2: Run Evaluation
# File: streamlit_app.py - run_evaluation()
# Step 1: Get test data
loader = RAGBenchLoader()
test_data = loader.get_test_data("wiki_qa", num_samples=10)
# Returns: [{"question": "Q1", "answer": "A1"}, ...]
# Step 2: Prepare test cases
test_cases = []
for sample in test_data:
# Query RAG system
result = rag_pipeline.query(sample["question"], n_results=5)
# Create test case
test_case = {
"query": sample["question"],
"response": result["response"],
"retrieved_documents": [doc["document"] for doc in result["retrieved_documents"]],
"ground_truth": sample.get("answer", "")
}
test_cases.append(test_case)
# Step 3: Run TRACE evaluation
evaluator = TRACEEvaluator()
results = evaluator.evaluate_batch(test_cases)
# Inside evaluate_batch():
# for test_case in test_cases:
# scores = evaluate(query, response, docs, ground_truth)
# all_scores.append(scores)
#
# avg_utilization = mean([s.utilization for s in all_scores])
# avg_relevance = mean([s.relevance for s in all_scores])
# avg_adherence = mean([s.adherence for s in all_scores])
# avg_completeness = mean([s.completeness for s in all_scores])
#
# return {
# "utilization": avg_utilization,
# "relevance": avg_relevance,
# "adherence": avg_adherence,
# "completeness": avg_completeness,
# "average": average of 4 metrics,
# "num_samples": 10,
# "individual_scores": [scores for each sample]
# }
# Step 4: Display results
st.metric("Utilization", f"{results['utilization']:.3f}")
st.metric("Relevance", f"{results['relevance']:.3f}")
st.metric("Adherence", f"{results['adherence']:.3f}")
st.metric("Completeness", f"{results['completeness']:.3f}")
Key Classes & Methods
Session State Management (Streamlit)
# File: streamlit_app.py - initialization
# Session state stores state between reruns
if "chat_history" not in st.session_state:
st.session_state.chat_history = []
if "rag_pipeline" not in st.session_state:
st.session_state.rag_pipeline = None
if "evaluation_results" not in st.session_state:
st.session_state.evaluation_results = None
if "dataset_name" not in st.session_state:
st.session_state.dataset_name = None
# Why session state?
# - Streamlit reruns entire script on every interaction
# - Session state preserves data across reruns
# - Without it: chat history would reset after every message
Main UI Tabs
# File: streamlit_app.py
tab1, tab2, tab3 = st.tabs(["π¬ Chat", "π Evaluation", "π History"])
with tab1:
chat_interface() # Conversational interface
with tab2:
evaluation_interface() # Run TRACE evaluation
with tab3:
history_interface() # View & export chat history
Configuration System
How Settings Are Used
# config.py
settings = Settings() # Loads from .env and defaults
# Usage in other files
from config import settings
# In llm_client.py
client = GroqLLMClient(
api_key=settings.groq_api_key,
max_rpm=settings.groq_rpm_limit
)
# In vector_store.py
vector_store = ChromaDBManager(settings.chroma_persist_directory)
# In streamlit_app.py
dataset_options = st.selectbox("Choose Dataset", settings.ragbench_datasets)
Summary: Code Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β STREAMLIT UI (streamlit_app.py) β
β βββββββββββββ¬βββββββββββββββ¬βββββββββββββββ β
β β Chat Tab β Eval Tab β History Tab β β
β βββββββββββββ΄βββββββββββββββ΄βββββββββββββββ β
ββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββ
β
ββββββββββββββΌβββββββββββββ
β β β
βΌ βΌ βΌ
[RAG Pipeline] [TRACE Eval] [History]
β β β
ββββββββββββββΌβββββββββββββ€
β β β
βββββΌβββββββββββββββββββββββ¬βββ΄βββββββββββββββββ
β β β
βΌ βΌ βΌ
[LLM Client] [Vector Store] [Dataset Loader]
ββ Rate Limiter ββ ChromaDB ββ RAGBench
ββ Groq API ββ Embedding ββ Process data
ββ Generation ββ Retrieval ββ Cache
βββββββββββββββββββββββ¬βββββββββββββββββββ
β β β
βΌ βΌ βΌ
[Embedding Models] [Chunking Strategies] [Config]
ββ SentenceTransformer ββ Dense ββ API Keys
ββ BioMedical BERT ββ Sparse ββ Models
ββ Multiple options ββ Hybrid ββ Settings
ββ Re-ranking ββ Paths
Quick Reference
| Component | Purpose | Key File |
|---|---|---|
| Streamlit App | User interface | streamlit_app.py |
| RAG Pipeline | Orchestrates query flow | llm_client.py |
| LLM Client | Generates responses | llm_client.py |
| Vector Store | Stores & retrieves embeddings | vector_store.py |
| Embeddings | Converts text to vectors | embedding_models.py |
| Datasets | Loads RAG Bench datasets | dataset_loader.py |
| Chunking | Splits documents | chunking_strategies.py |
| Evaluation | TRACE metrics | trace_evaluator.py |
| Config | Settings management | config.py |
This comprehensive guide covers the architecture, data flow, and key components of your RAG application! π