# RAG Capstone Project - Code Architecture & Walkthrough ## Table of Contents 1. [Project Structure](#project-structure) 2. [Core Components](#core-components) 3. [Data Flow](#data-flow) 4. [Detailed Code Walkthroughs](#detailed-code-walkthroughs) 5. [Key Classes & Methods](#key-classes--methods) 6. [Configuration System](#configuration-system) --- ## Project Structure ``` RAG Capstone Project/ ├── streamlit_app.py # Main UI application ├── api.py # FastAPI backend (optional) ├── llm_client.py # Groq LLM integration ├── vector_store.py # ChromaDB management ├── dataset_loader.py # RAGBench dataset loading ├── embedding_models.py # Embedding model factory ├── chunking_strategies.py # Document chunking ├── trace_evaluator.py # Evaluation metrics ├── config.py # Configuration settings ├── requirements.txt # Dependencies └── chroma_db/ # Persistent vector store ``` --- ## Core Components ### **1. Configuration (config.py)** ```python class Settings(BaseSettings): """Central configuration management using Pydantic.""" # API Configuration groq_api_key: str = "" # Groq API key groq_rpm_limit: int = 30 # Requests per minute rate_limit_delay: float = 2.0 # Delay between requests # Storage chroma_persist_directory: str = "./chroma_db" # Available Models embedding_models: list = [ # 8 embedding options "sentence-transformers/all-mpnet-base-v2", "emilyalsentzer/Bio_ClinicalBERT", "microsoft/BiomedNLP-PubMedBERT-base-uncased-abstract", "sentence-transformers/all-MiniLM-L6-v2", "sentence-transformers/multilingual-MiniLM-L12-v2", "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2", "allenai/specter", "gemini-embedding-001" ] llm_models: list = [ # 3 LLM options "meta-llama/llama-4-maverick-17b-128e-instruct", "llama-3.1-8b-instant", "openai/gpt-oss-120b" ] chunking_strategies: list = [ # 4 chunking strategies "dense", "sparse", "hybrid", "re-ranking" ] ragbench_datasets: list = [ # 12 RAGBench datasets "covidqa", "cuad", "delucionqa", "emanual", "expertqa", "finqa", "hagrid", "hotpotqa", "msmarco", "pubmedqa", "tatqa", "techqa" ] ``` **Key Features:** - ✅ Pydantic validation - ✅ Loads from `.env` file - ✅ Centralized settings management - ✅ Easy to extend --- ### **2. LLM Client (llm_client.py)** #### **A. Rate Limiter Class** ```python class RateLimiter: """Prevents API rate limit violations.""" def __init__(self, max_requests_per_minute: int = 30): self.max_requests = 30 # 30 requests/minute for Groq self.request_times = deque() # Track request timestamps def acquire_sync(self): """ Synchronous rate limiting: Flow: 1. Remove requests older than 1 minute 2. If at limit: calculate wait time 3. Sleep for wait time 4. Record this request Example: - At 00:00 make 30 requests - At 00:05 try 31st request - Wait time = 60 - 5 = 55 seconds """ ``` **Why needed?** - Groq API has 30 requests/minute limit - Prevents rate limit errors - Handles multiple concurrent requests gracefully #### **B. GroqLLMClient Class** ```python class GroqLLMClient: """Main LLM interface using Groq API.""" def __init__(self, api_key: str, model_name: str, max_rpm: int = 30): self.client = Groq(api_key=api_key) # Groq API client self.model_name = model_name # Selected model self.rate_limiter = RateLimiter(max_rpm) # Rate limiting def generate(self, prompt: str, max_tokens: int = 1024) -> str: """ Generate text from prompt: Execution Flow: 1. rate_limiter.acquire_sync() # Wait if needed 2. self.client.chat.completions.create() # Call Groq API 3. time.sleep(rate_limit_delay) # Additional delay 4. Return response.choices[0].message.content Code: """ self.rate_limiter.acquire_sync() messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": prompt}] response = self.client.chat.completions.create( model=self.model_name, messages=messages, max_tokens=max_tokens, temperature=0.7 ) time.sleep(self.rate_limit_delay) return response.choices[0].message.content ``` #### **C. RAGPipeline Class** ```python class RAGPipeline: """Orchestrates the complete RAG workflow.""" def __init__(self, llm_client: GroqLLMClient, vector_store_manager: ChromaDBManager): self.llm_client = llm_client self.vector_store = vector_store_manager self.chat_history = [] def query(self, query: str, n_results: int = 5) -> Dict: """ Execute RAG Query: Step 1: RETRIEVAL ───────────────── retrieved_docs = vector_store.get_retrieved_documents(query, n_results=5) Step 2: CONTEXT AUGMENTATION ──────────────────────────── doc_texts = [doc["document"] for doc in retrieved_docs] Step 3: GENERATION ────────────────── response = llm.generate_with_context(query, doc_texts) Step 4: HISTORY ─────────────── chat_history.append({"query": query, "response": response}) Return: { "query": "What is AI?", "response": "Generated answer...", "retrieved_documents": [ { "document": "AI is...", "distance": 0.123, "metadata": {...} }, ... ] } """ ``` --- ### **3. Embedding Models (embedding_models.py)** #### **Model Types** ```python class EmbeddingModel: """Base class for all embedding models.""" def embed_documents(self, texts: List[str]) -> np.ndarray: """Convert texts to vectors (embeddings).""" raise NotImplementedError def embed_query(self, query: str) -> np.ndarray: """Convert query to vector.""" return self.embed_documents([query])[0] class SentenceTransformerEmbedding(EmbeddingModel): """Uses pre-trained transformer models from HuggingFace.""" def load_model(self): """ Load SentenceTransformer: What it does: 1. Downloads model from HuggingFace 2. Loads to GPU (if available) or CPU 3. Sets to eval mode (no dropout) Example: model = SentenceTransformer("all-mpnet-base-v2") """ self.model = SentenceTransformer(self.model_name, device=self.device) def embed_documents(self, texts: List[str], batch_size: int = 32) -> np.ndarray: """ Batch embed documents: Process: 1. Split texts into batches (32 texts at a time) 2. For each batch: self.model.encode(batch) 3. Stack all embeddings 4. Return numpy array Efficiency: Batching prevents memory overflow """ embeddings = [] for i in range(0, len(texts), batch_size): batch = texts[i:i+batch_size] embeddings.append(self.model.encode(batch, convert_to_numpy=True)) return np.vstack(embeddings) ``` #### **Factory Pattern** ```python class EmbeddingFactory: """Creates appropriate embedding model.""" @staticmethod def create_embedding_model(model_name: str) -> EmbeddingModel: """ Automatically select model type: Logic: - If "Bio" or "Biomed" in name → BioMedicalEmbedding - If "specter" → SentenceTransformerEmbedding - Otherwise → SentenceTransformerEmbedding (default) Usage: model = EmbeddingFactory.create_embedding_model("all-mpnet-base-v2") embeddings = model.embed_documents(["text1", "text2"]) """ ``` --- ### **4. Vector Store (vector_store.py)** #### **ChromaDBManager Class** ```python class ChromaDBManager: """Manages ChromaDB vector database.""" def __init__(self, persist_directory: str = "./chroma_db"): """ Initialize persistent vector database: Key: PersistentClient ensures data survives app restarts Features: 1. Create/use directory: ./chroma_db 2. Initialize PersistentClient (not ephemeral) 3. Enable telemetry anonymization 4. Fallback to regular Client if needed """ self.client = chromadb.PersistentClient( path=persist_directory, settings=Settings(anonymized_telemetry=False) ) self.current_collection = None self.embedding_model = None def create_collection(self, collection_name: str, embedding_model_name: str) -> chromadb.Collection: """ Create new vector collection: Process: 1. Delete if exists (avoid conflicts) 2. Load embedding model 3. Create ChromaDB collection with metadata 4. Store reference in self.current_collection Collection Structure: { "name": "wiki_qa_dense_all_mpnet", "metadata": { "embedding_model": "all-mpnet-base-v2", "hnsw:space": "cosine" # Similarity metric }, "documents": [...], # Document texts "embeddings": [...], # Vector embeddings "metadatas": [...] # Document metadata } """ def add_documents(self, documents: List[str], metadatas: Optional[List[Dict]] = None): """ Add documents to collection: Steps: 1. Generate IDs if not provided: uuid.uuid4() 2. Generate default metadata if not provided 3. Process in batches (prevents memory issues) 4. Embed each batch: self.embedding_model.embed_documents() 5. Add to collection: self.current_collection.add() Code Flow: for batch in batches(documents, batch_size=100): embeddings = self.embedding_model.embed_documents(batch) self.current_collection.add( ids=ids, embeddings=embeddings, documents=batch, metadatas=metadatas ) """ def get_retrieved_documents(self, query: str, n_results: int = 5) -> List[Dict]: """ Retrieve similar documents: Retrieval Process (using HNSW): 1. Embed query: embedding = embed_model.embed_query(query) 2. Query collection: results = collection.query( query_embeddings=[embedding], n_results=5, include=["documents", "metadatas", "distances"] ) 3. Format results and return Return Format: [ { "document": "Document text...", "distance": 0.123, # Lower = more similar "metadata": {...} }, ... ] """ ``` --- ### **5. Dataset Loader (dataset_loader.py)** ```python class RAGBenchLoader: """Loads datasets from Hugging Face rungalileo/ragbench.""" SUPPORTED_DATASETS = [ 'covidqa', 'cuad', 'delucionqa', 'emanual', 'expertqa', 'finqa', 'hagrid', 'hotpotqa', 'msmarco', 'pubmedqa', 'tatqa', 'techqa' ] def load_dataset(self, dataset_name: str, split: str = "train", max_samples: Optional[int] = None) -> List[Dict]: """ Load RAGBench dataset: Process: 1. Validate dataset name 2. Load from HuggingFace: load_dataset("rungalileo/ragbench", dataset_name) 3. Select max_samples if specified 4. Process each item: _process_ragbench_item() 5. Return list of standardized dicts Result Format: [ { "question": "What is X?", "answer": "X is...", "documents": ["doc1", "doc2", ...], "context": "combined document text", "dataset": "wiki_qa" }, ... ] Caching: - First load downloads ~100MB per dataset - Subsequent loads use cache - Cache location: ./data_cache/ """ def get_test_data_size(self, dataset_name: str) -> int: """ Get available test samples without loading full dataset: Efficient Approach: 1. builder = load_dataset_builder("rungalileo/ragbench", dataset_name) 2. Check splits: builder.info.splits 3. Return: builder.info.splits['test'].num_examples Benefit: Fast metadata access (~1 second) vs. Full load (~30 seconds) Fallback: - If builder fails: load_dataset() and return len(ds) - If error: return 100 (reasonable default) """ ``` --- ## Data Flow ### **Complete RAG Query Flow** ``` User Types Question in Chat ↓ streamlit_app.py receives input ↓ RAGPipeline.query(question) ├─ STEP 1: RETRIEVAL │ ├─ embedding_model.embed_query(question) │ │ └─ "What is AI?" → [0.1, 0.2, 0.3, ...] │ │ │ └─ vector_store.get_retrieved_documents(query_embedding, n_results=5) │ └─ Search in ChromaDB collection │ └─ Return top 5 similar documents │ ├─ STEP 2: CONTEXT PREPARATION │ └─ Extract text from retrieved documents │ ├─ STEP 3: GENERATION │ └─ llm_client.generate_with_context(question, doc_texts) │ ├─ Rate limiter checks (wait if needed) │ ├─ Send to Groq API: │ │ "Use context to answer: [docs...] Question: [q...]" │ └─ Return generated response │ └─ STEP 4: RETURN RESULT └─ {"query": q, "response": r, "retrieved_documents": docs} ↓ Display in Streamlit UI ``` --- ### **Collection Creation Flow** ``` User configures in sidebar: ├─ Dataset: wiki_qa ├─ Embedding: all-mpnet-base-v2 ├─ Chunking: dense └─ LLM: llama-3.1-8b ↓ Click "Load Data & Create Collection" ↓ dataset_loader.load_dataset("wiki_qa", split="train", max_samples=100) ├─ Downloads dataset from HuggingFace ├─ Processes 100 samples └─ Returns list of {"question", "answer", "documents", ...} ↓ chunking_strategy.chunk(documents, chunk_size=512, overlap=50) ├─ Split large docs into 512-token chunks ├─ Maintain 50-token overlap for context └─ Returns list of chunks ↓ vector_store.load_dataset_into_collection() ├─ Create collection: "wiki_qa_dense_all_mpnet" ├─ For each chunk: │ ├─ embedding_model.embed(chunk) │ ├─ Generate UUID │ └─ Store in ChromaDB └─ Persist to ./chroma_db/ on disk ↓ Store references in Streamlit session state ↓ Ready for chat & evaluation! ``` --- ## Detailed Code Walkthroughs ### **Walkthrough 1: User Chats with RAG System** ```python # File: streamlit_app.py - chat_interface() # Step 1: User types query query = st.chat_input("Ask a question...") if query: # Step 2: Display user message with st.chat_message("user"): st.write(query) # Step 3: Call RAG pipeline result = st.session_state.rag_pipeline.query( query, n_results=5 # Retrieve top 5 docs ) # Inside RAGPipeline.query(): # - retrieved_docs = vector_store.get_retrieved_documents() # - doc_texts = extract texts # - response = llm.generate_with_context(query, doc_texts) # - Store in chat_history # - Return {"query", "response", "retrieved_documents"} # Step 4: Display response with st.chat_message("assistant"): st.write(result["response"]) # Step 5: Show retrieved documents with st.expander("📄 Retrieved Documents"): for i, doc in enumerate(result["retrieved_documents"]): st.markdown(f"**Doc {i+1}** - Distance: {doc['distance']:.4f}") st.text_area("", value=doc["document"], height=100) # Step 6: Store in session history st.session_state.chat_history.append(result) st.rerun() ``` ### **Walkthrough 2: Run Evaluation** ```python # File: streamlit_app.py - run_evaluation() # Step 1: Get test data loader = RAGBenchLoader() test_data = loader.get_test_data("wiki_qa", num_samples=10) # Returns: [{"question": "Q1", "answer": "A1"}, ...] # Step 2: Prepare test cases test_cases = [] for sample in test_data: # Query RAG system result = rag_pipeline.query(sample["question"], n_results=5) # Create test case test_case = { "query": sample["question"], "response": result["response"], "retrieved_documents": [doc["document"] for doc in result["retrieved_documents"]], "ground_truth": sample.get("answer", "") } test_cases.append(test_case) # Step 3: Run TRACE evaluation evaluator = TRACEEvaluator() results = evaluator.evaluate_batch(test_cases) # Inside evaluate_batch(): # for test_case in test_cases: # scores = evaluate(query, response, docs, ground_truth) # all_scores.append(scores) # # avg_utilization = mean([s.utilization for s in all_scores]) # avg_relevance = mean([s.relevance for s in all_scores]) # avg_adherence = mean([s.adherence for s in all_scores]) # avg_completeness = mean([s.completeness for s in all_scores]) # # return { # "utilization": avg_utilization, # "relevance": avg_relevance, # "adherence": avg_adherence, # "completeness": avg_completeness, # "average": average of 4 metrics, # "num_samples": 10, # "individual_scores": [scores for each sample] # } # Step 4: Display results st.metric("Utilization", f"{results['utilization']:.3f}") st.metric("Relevance", f"{results['relevance']:.3f}") st.metric("Adherence", f"{results['adherence']:.3f}") st.metric("Completeness", f"{results['completeness']:.3f}") ``` --- ## Key Classes & Methods ### **Session State Management (Streamlit)** ```python # File: streamlit_app.py - initialization # Session state stores state between reruns if "chat_history" not in st.session_state: st.session_state.chat_history = [] if "rag_pipeline" not in st.session_state: st.session_state.rag_pipeline = None if "evaluation_results" not in st.session_state: st.session_state.evaluation_results = None if "dataset_name" not in st.session_state: st.session_state.dataset_name = None # Why session state? # - Streamlit reruns entire script on every interaction # - Session state preserves data across reruns # - Without it: chat history would reset after every message ``` ### **Main UI Tabs** ```python # File: streamlit_app.py tab1, tab2, tab3 = st.tabs(["💬 Chat", "📊 Evaluation", "📜 History"]) with tab1: chat_interface() # Conversational interface with tab2: evaluation_interface() # Run TRACE evaluation with tab3: history_interface() # View & export chat history ``` --- ## Configuration System ### **How Settings Are Used** ```python # config.py settings = Settings() # Loads from .env and defaults # Usage in other files from config import settings # In llm_client.py client = GroqLLMClient( api_key=settings.groq_api_key, max_rpm=settings.groq_rpm_limit ) # In vector_store.py vector_store = ChromaDBManager(settings.chroma_persist_directory) # In streamlit_app.py dataset_options = st.selectbox("Choose Dataset", settings.ragbench_datasets) ``` --- ## Summary: Code Architecture ``` ┌─────────────────────────────────────────────────────────┐ │ STREAMLIT UI (streamlit_app.py) │ │ ┌───────────┬──────────────┬──────────────┐ │ │ │ Chat Tab │ Eval Tab │ History Tab │ │ │ └───────────┴──────────────┴──────────────┘ │ └────────────────────┬────────────────────────────────────┘ │ ┌────────────┼────────────┐ │ │ │ ▼ ▼ ▼ [RAG Pipeline] [TRACE Eval] [History] │ │ │ ├────────────┼────────────┤ │ │ │ ┌───▼──────────────────────┬──┴────────────────┐ │ │ │ ▼ ▼ ▼ [LLM Client] [Vector Store] [Dataset Loader] ├─ Rate Limiter ├─ ChromaDB ├─ RAGBench ├─ Groq API ├─ Embedding ├─ Process data └─ Generation ├─ Retrieval └─ Cache ┌─────────────────────┬──────────────────┐ │ │ │ ▼ ▼ ▼ [Embedding Models] [Chunking Strategies] [Config] ├─ SentenceTransformer ├─ Dense ├─ API Keys ├─ BioMedical BERT ├─ Sparse ├─ Models └─ Multiple options ├─ Hybrid ├─ Settings └─ Re-ranking └─ Paths ``` --- ## Quick Reference | Component | Purpose | Key File | |-----------|---------|----------| | **Streamlit App** | User interface | streamlit_app.py | | **RAG Pipeline** | Orchestrates query flow | llm_client.py | | **LLM Client** | Generates responses | llm_client.py | | **Vector Store** | Stores & retrieves embeddings | vector_store.py | | **Embeddings** | Converts text to vectors | embedding_models.py | | **Datasets** | Loads RAG Bench datasets | dataset_loader.py | | **Chunking** | Splits documents | chunking_strategies.py | | **Evaluation** | TRACE metrics | trace_evaluator.py | | **Config** | Settings management | config.py | --- This comprehensive guide covers the architecture, data flow, and key components of your RAG application! 🚀