Spaces:
Running
Running
| # RAG Capstone Project - Code Architecture & Walkthrough | |
| ## Table of Contents | |
| 1. [Project Structure](#project-structure) | |
| 2. [Core Components](#core-components) | |
| 3. [Data Flow](#data-flow) | |
| 4. [Detailed Code Walkthroughs](#detailed-code-walkthroughs) | |
| 5. [Key Classes & Methods](#key-classes--methods) | |
| 6. [Configuration System](#configuration-system) | |
| --- | |
| ## Project Structure | |
| ``` | |
| RAG Capstone Project/ | |
| βββ streamlit_app.py # Main UI application | |
| βββ api.py # FastAPI backend (optional) | |
| βββ llm_client.py # Groq LLM integration | |
| βββ vector_store.py # ChromaDB management | |
| βββ dataset_loader.py # RAGBench dataset loading | |
| βββ embedding_models.py # Embedding model factory | |
| βββ chunking_strategies.py # Document chunking | |
| βββ trace_evaluator.py # Evaluation metrics | |
| βββ config.py # Configuration settings | |
| βββ requirements.txt # Dependencies | |
| βββ chroma_db/ # Persistent vector store | |
| ``` | |
| --- | |
| ## Core Components | |
| ### **1. Configuration (config.py)** | |
| ```python | |
| class Settings(BaseSettings): | |
| """Central configuration management using Pydantic.""" | |
| # API Configuration | |
| groq_api_key: str = "" # Groq API key | |
| groq_rpm_limit: int = 30 # Requests per minute | |
| rate_limit_delay: float = 2.0 # Delay between requests | |
| # Storage | |
| chroma_persist_directory: str = "./chroma_db" | |
| # Available Models | |
| embedding_models: list = [ # 8 embedding options | |
| "sentence-transformers/all-mpnet-base-v2", | |
| "emilyalsentzer/Bio_ClinicalBERT", | |
| "microsoft/BiomedNLP-PubMedBERT-base-uncased-abstract", | |
| "sentence-transformers/all-MiniLM-L6-v2", | |
| "sentence-transformers/multilingual-MiniLM-L12-v2", | |
| "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2", | |
| "allenai/specter", | |
| "gemini-embedding-001" | |
| ] | |
| llm_models: list = [ # 3 LLM options | |
| "meta-llama/llama-4-maverick-17b-128e-instruct", | |
| "llama-3.1-8b-instant", | |
| "openai/gpt-oss-120b" | |
| ] | |
| chunking_strategies: list = [ # 4 chunking strategies | |
| "dense", "sparse", "hybrid", "re-ranking" | |
| ] | |
| ragbench_datasets: list = [ # 12 RAGBench datasets | |
| "covidqa", "cuad", "delucionqa", "emanual", | |
| "expertqa", "finqa", "hagrid", "hotpotqa", | |
| "msmarco", "pubmedqa", "tatqa", "techqa" | |
| ] | |
| ``` | |
| **Key Features:** | |
| - β Pydantic validation | |
| - β Loads from `.env` file | |
| - β Centralized settings management | |
| - β Easy to extend | |
| --- | |
| ### **2. LLM Client (llm_client.py)** | |
| #### **A. Rate Limiter Class** | |
| ```python | |
| class RateLimiter: | |
| """Prevents API rate limit violations.""" | |
| def __init__(self, max_requests_per_minute: int = 30): | |
| self.max_requests = 30 # 30 requests/minute for Groq | |
| self.request_times = deque() # Track request timestamps | |
| def acquire_sync(self): | |
| """ | |
| Synchronous rate limiting: | |
| Flow: | |
| 1. Remove requests older than 1 minute | |
| 2. If at limit: calculate wait time | |
| 3. Sleep for wait time | |
| 4. Record this request | |
| Example: | |
| - At 00:00 make 30 requests | |
| - At 00:05 try 31st request | |
| - Wait time = 60 - 5 = 55 seconds | |
| """ | |
| ``` | |
| **Why needed?** | |
| - Groq API has 30 requests/minute limit | |
| - Prevents rate limit errors | |
| - Handles multiple concurrent requests gracefully | |
| #### **B. GroqLLMClient Class** | |
| ```python | |
| class GroqLLMClient: | |
| """Main LLM interface using Groq API.""" | |
| def __init__(self, api_key: str, model_name: str, max_rpm: int = 30): | |
| self.client = Groq(api_key=api_key) # Groq API client | |
| self.model_name = model_name # Selected model | |
| self.rate_limiter = RateLimiter(max_rpm) # Rate limiting | |
| def generate(self, prompt: str, max_tokens: int = 1024) -> str: | |
| """ | |
| Generate text from prompt: | |
| Execution Flow: | |
| 1. rate_limiter.acquire_sync() # Wait if needed | |
| 2. self.client.chat.completions.create() # Call Groq API | |
| 3. time.sleep(rate_limit_delay) # Additional delay | |
| 4. Return response.choices[0].message.content | |
| Code: | |
| """ | |
| self.rate_limiter.acquire_sync() | |
| messages = [{"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": prompt}] | |
| response = self.client.chat.completions.create( | |
| model=self.model_name, | |
| messages=messages, | |
| max_tokens=max_tokens, | |
| temperature=0.7 | |
| ) | |
| time.sleep(self.rate_limit_delay) | |
| return response.choices[0].message.content | |
| ``` | |
| #### **C. RAGPipeline Class** | |
| ```python | |
| class RAGPipeline: | |
| """Orchestrates the complete RAG workflow.""" | |
| def __init__(self, llm_client: GroqLLMClient, | |
| vector_store_manager: ChromaDBManager): | |
| self.llm_client = llm_client | |
| self.vector_store = vector_store_manager | |
| self.chat_history = [] | |
| def query(self, query: str, n_results: int = 5) -> Dict: | |
| """ | |
| Execute RAG Query: | |
| Step 1: RETRIEVAL | |
| βββββββββββββββββ | |
| retrieved_docs = vector_store.get_retrieved_documents(query, n_results=5) | |
| Step 2: CONTEXT AUGMENTATION | |
| ββββββββββββββββββββββββββββ | |
| doc_texts = [doc["document"] for doc in retrieved_docs] | |
| Step 3: GENERATION | |
| ββββββββββββββββββ | |
| response = llm.generate_with_context(query, doc_texts) | |
| Step 4: HISTORY | |
| βββββββββββββββ | |
| chat_history.append({"query": query, "response": response}) | |
| Return: { | |
| "query": "What is AI?", | |
| "response": "Generated answer...", | |
| "retrieved_documents": [ | |
| { | |
| "document": "AI is...", | |
| "distance": 0.123, | |
| "metadata": {...} | |
| }, | |
| ... | |
| ] | |
| } | |
| """ | |
| ``` | |
| --- | |
| ### **3. Embedding Models (embedding_models.py)** | |
| #### **Model Types** | |
| ```python | |
| class EmbeddingModel: | |
| """Base class for all embedding models.""" | |
| def embed_documents(self, texts: List[str]) -> np.ndarray: | |
| """Convert texts to vectors (embeddings).""" | |
| raise NotImplementedError | |
| def embed_query(self, query: str) -> np.ndarray: | |
| """Convert query to vector.""" | |
| return self.embed_documents([query])[0] | |
| class SentenceTransformerEmbedding(EmbeddingModel): | |
| """Uses pre-trained transformer models from HuggingFace.""" | |
| def load_model(self): | |
| """ | |
| Load SentenceTransformer: | |
| What it does: | |
| 1. Downloads model from HuggingFace | |
| 2. Loads to GPU (if available) or CPU | |
| 3. Sets to eval mode (no dropout) | |
| Example: | |
| model = SentenceTransformer("all-mpnet-base-v2") | |
| """ | |
| self.model = SentenceTransformer(self.model_name, device=self.device) | |
| def embed_documents(self, texts: List[str], batch_size: int = 32) -> np.ndarray: | |
| """ | |
| Batch embed documents: | |
| Process: | |
| 1. Split texts into batches (32 texts at a time) | |
| 2. For each batch: self.model.encode(batch) | |
| 3. Stack all embeddings | |
| 4. Return numpy array | |
| Efficiency: Batching prevents memory overflow | |
| """ | |
| embeddings = [] | |
| for i in range(0, len(texts), batch_size): | |
| batch = texts[i:i+batch_size] | |
| embeddings.append(self.model.encode(batch, convert_to_numpy=True)) | |
| return np.vstack(embeddings) | |
| ``` | |
| #### **Factory Pattern** | |
| ```python | |
| class EmbeddingFactory: | |
| """Creates appropriate embedding model.""" | |
| @staticmethod | |
| def create_embedding_model(model_name: str) -> EmbeddingModel: | |
| """ | |
| Automatically select model type: | |
| Logic: | |
| - If "Bio" or "Biomed" in name β BioMedicalEmbedding | |
| - If "specter" β SentenceTransformerEmbedding | |
| - Otherwise β SentenceTransformerEmbedding (default) | |
| Usage: | |
| model = EmbeddingFactory.create_embedding_model("all-mpnet-base-v2") | |
| embeddings = model.embed_documents(["text1", "text2"]) | |
| """ | |
| ``` | |
| --- | |
| ### **4. Vector Store (vector_store.py)** | |
| #### **ChromaDBManager Class** | |
| ```python | |
| class ChromaDBManager: | |
| """Manages ChromaDB vector database.""" | |
| def __init__(self, persist_directory: str = "./chroma_db"): | |
| """ | |
| Initialize persistent vector database: | |
| Key: PersistentClient ensures data survives app restarts | |
| Features: | |
| 1. Create/use directory: ./chroma_db | |
| 2. Initialize PersistentClient (not ephemeral) | |
| 3. Enable telemetry anonymization | |
| 4. Fallback to regular Client if needed | |
| """ | |
| self.client = chromadb.PersistentClient( | |
| path=persist_directory, | |
| settings=Settings(anonymized_telemetry=False) | |
| ) | |
| self.current_collection = None | |
| self.embedding_model = None | |
| def create_collection(self, collection_name: str, | |
| embedding_model_name: str) -> chromadb.Collection: | |
| """ | |
| Create new vector collection: | |
| Process: | |
| 1. Delete if exists (avoid conflicts) | |
| 2. Load embedding model | |
| 3. Create ChromaDB collection with metadata | |
| 4. Store reference in self.current_collection | |
| Collection Structure: | |
| { | |
| "name": "wiki_qa_dense_all_mpnet", | |
| "metadata": { | |
| "embedding_model": "all-mpnet-base-v2", | |
| "hnsw:space": "cosine" # Similarity metric | |
| }, | |
| "documents": [...], # Document texts | |
| "embeddings": [...], # Vector embeddings | |
| "metadatas": [...] # Document metadata | |
| } | |
| """ | |
| def add_documents(self, documents: List[str], | |
| metadatas: Optional[List[Dict]] = None): | |
| """ | |
| Add documents to collection: | |
| Steps: | |
| 1. Generate IDs if not provided: uuid.uuid4() | |
| 2. Generate default metadata if not provided | |
| 3. Process in batches (prevents memory issues) | |
| 4. Embed each batch: self.embedding_model.embed_documents() | |
| 5. Add to collection: self.current_collection.add() | |
| Code Flow: | |
| for batch in batches(documents, batch_size=100): | |
| embeddings = self.embedding_model.embed_documents(batch) | |
| self.current_collection.add( | |
| ids=ids, | |
| embeddings=embeddings, | |
| documents=batch, | |
| metadatas=metadatas | |
| ) | |
| """ | |
| def get_retrieved_documents(self, query: str, | |
| n_results: int = 5) -> List[Dict]: | |
| """ | |
| Retrieve similar documents: | |
| Retrieval Process (using HNSW): | |
| 1. Embed query: embedding = embed_model.embed_query(query) | |
| 2. Query collection: results = collection.query( | |
| query_embeddings=[embedding], | |
| n_results=5, | |
| include=["documents", "metadatas", "distances"] | |
| ) | |
| 3. Format results and return | |
| Return Format: | |
| [ | |
| { | |
| "document": "Document text...", | |
| "distance": 0.123, # Lower = more similar | |
| "metadata": {...} | |
| }, | |
| ... | |
| ] | |
| """ | |
| ``` | |
| --- | |
| ### **5. Dataset Loader (dataset_loader.py)** | |
| ```python | |
| class RAGBenchLoader: | |
| """Loads datasets from Hugging Face rungalileo/ragbench.""" | |
| SUPPORTED_DATASETS = [ | |
| 'covidqa', 'cuad', 'delucionqa', 'emanual', | |
| 'expertqa', 'finqa', 'hagrid', 'hotpotqa', | |
| 'msmarco', 'pubmedqa', 'tatqa', 'techqa' | |
| ] | |
| def load_dataset(self, dataset_name: str, split: str = "train", | |
| max_samples: Optional[int] = None) -> List[Dict]: | |
| """ | |
| Load RAGBench dataset: | |
| Process: | |
| 1. Validate dataset name | |
| 2. Load from HuggingFace: load_dataset("rungalileo/ragbench", dataset_name) | |
| 3. Select max_samples if specified | |
| 4. Process each item: _process_ragbench_item() | |
| 5. Return list of standardized dicts | |
| Result Format: | |
| [ | |
| { | |
| "question": "What is X?", | |
| "answer": "X is...", | |
| "documents": ["doc1", "doc2", ...], | |
| "context": "combined document text", | |
| "dataset": "wiki_qa" | |
| }, | |
| ... | |
| ] | |
| Caching: | |
| - First load downloads ~100MB per dataset | |
| - Subsequent loads use cache | |
| - Cache location: ./data_cache/ | |
| """ | |
| def get_test_data_size(self, dataset_name: str) -> int: | |
| """ | |
| Get available test samples without loading full dataset: | |
| Efficient Approach: | |
| 1. builder = load_dataset_builder("rungalileo/ragbench", dataset_name) | |
| 2. Check splits: builder.info.splits | |
| 3. Return: builder.info.splits['test'].num_examples | |
| Benefit: Fast metadata access (~1 second) | |
| vs. Full load (~30 seconds) | |
| Fallback: | |
| - If builder fails: load_dataset() and return len(ds) | |
| - If error: return 100 (reasonable default) | |
| """ | |
| ``` | |
| --- | |
| ## Data Flow | |
| ### **Complete RAG Query Flow** | |
| ``` | |
| User Types Question in Chat | |
| β | |
| streamlit_app.py receives input | |
| β | |
| RAGPipeline.query(question) | |
| ββ STEP 1: RETRIEVAL | |
| β ββ embedding_model.embed_query(question) | |
| β β ββ "What is AI?" β [0.1, 0.2, 0.3, ...] | |
| β β | |
| β ββ vector_store.get_retrieved_documents(query_embedding, n_results=5) | |
| β ββ Search in ChromaDB collection | |
| β ββ Return top 5 similar documents | |
| β | |
| ββ STEP 2: CONTEXT PREPARATION | |
| β ββ Extract text from retrieved documents | |
| β | |
| ββ STEP 3: GENERATION | |
| β ββ llm_client.generate_with_context(question, doc_texts) | |
| β ββ Rate limiter checks (wait if needed) | |
| β ββ Send to Groq API: | |
| β β "Use context to answer: [docs...] Question: [q...]" | |
| β ββ Return generated response | |
| β | |
| ββ STEP 4: RETURN RESULT | |
| ββ {"query": q, "response": r, "retrieved_documents": docs} | |
| β | |
| Display in Streamlit UI | |
| ``` | |
| --- | |
| ### **Collection Creation Flow** | |
| ``` | |
| User configures in sidebar: | |
| ββ Dataset: wiki_qa | |
| ββ Embedding: all-mpnet-base-v2 | |
| ββ Chunking: dense | |
| ββ LLM: llama-3.1-8b | |
| β | |
| Click "Load Data & Create Collection" | |
| β | |
| dataset_loader.load_dataset("wiki_qa", split="train", max_samples=100) | |
| ββ Downloads dataset from HuggingFace | |
| ββ Processes 100 samples | |
| ββ Returns list of {"question", "answer", "documents", ...} | |
| β | |
| chunking_strategy.chunk(documents, chunk_size=512, overlap=50) | |
| ββ Split large docs into 512-token chunks | |
| ββ Maintain 50-token overlap for context | |
| ββ Returns list of chunks | |
| β | |
| vector_store.load_dataset_into_collection() | |
| ββ Create collection: "wiki_qa_dense_all_mpnet" | |
| ββ For each chunk: | |
| β ββ embedding_model.embed(chunk) | |
| β ββ Generate UUID | |
| β ββ Store in ChromaDB | |
| ββ Persist to ./chroma_db/ on disk | |
| β | |
| Store references in Streamlit session state | |
| β | |
| Ready for chat & evaluation! | |
| ``` | |
| --- | |
| ## Detailed Code Walkthroughs | |
| ### **Walkthrough 1: User Chats with RAG System** | |
| ```python | |
| # File: streamlit_app.py - chat_interface() | |
| # Step 1: User types query | |
| query = st.chat_input("Ask a question...") | |
| if query: | |
| # Step 2: Display user message | |
| with st.chat_message("user"): | |
| st.write(query) | |
| # Step 3: Call RAG pipeline | |
| result = st.session_state.rag_pipeline.query( | |
| query, | |
| n_results=5 # Retrieve top 5 docs | |
| ) | |
| # Inside RAGPipeline.query(): | |
| # - retrieved_docs = vector_store.get_retrieved_documents() | |
| # - doc_texts = extract texts | |
| # - response = llm.generate_with_context(query, doc_texts) | |
| # - Store in chat_history | |
| # - Return {"query", "response", "retrieved_documents"} | |
| # Step 4: Display response | |
| with st.chat_message("assistant"): | |
| st.write(result["response"]) | |
| # Step 5: Show retrieved documents | |
| with st.expander("π Retrieved Documents"): | |
| for i, doc in enumerate(result["retrieved_documents"]): | |
| st.markdown(f"**Doc {i+1}** - Distance: {doc['distance']:.4f}") | |
| st.text_area("", value=doc["document"], height=100) | |
| # Step 6: Store in session history | |
| st.session_state.chat_history.append(result) | |
| st.rerun() | |
| ``` | |
| ### **Walkthrough 2: Run Evaluation** | |
| ```python | |
| # File: streamlit_app.py - run_evaluation() | |
| # Step 1: Get test data | |
| loader = RAGBenchLoader() | |
| test_data = loader.get_test_data("wiki_qa", num_samples=10) | |
| # Returns: [{"question": "Q1", "answer": "A1"}, ...] | |
| # Step 2: Prepare test cases | |
| test_cases = [] | |
| for sample in test_data: | |
| # Query RAG system | |
| result = rag_pipeline.query(sample["question"], n_results=5) | |
| # Create test case | |
| test_case = { | |
| "query": sample["question"], | |
| "response": result["response"], | |
| "retrieved_documents": [doc["document"] for doc in result["retrieved_documents"]], | |
| "ground_truth": sample.get("answer", "") | |
| } | |
| test_cases.append(test_case) | |
| # Step 3: Run TRACE evaluation | |
| evaluator = TRACEEvaluator() | |
| results = evaluator.evaluate_batch(test_cases) | |
| # Inside evaluate_batch(): | |
| # for test_case in test_cases: | |
| # scores = evaluate(query, response, docs, ground_truth) | |
| # all_scores.append(scores) | |
| # | |
| # avg_utilization = mean([s.utilization for s in all_scores]) | |
| # avg_relevance = mean([s.relevance for s in all_scores]) | |
| # avg_adherence = mean([s.adherence for s in all_scores]) | |
| # avg_completeness = mean([s.completeness for s in all_scores]) | |
| # | |
| # return { | |
| # "utilization": avg_utilization, | |
| # "relevance": avg_relevance, | |
| # "adherence": avg_adherence, | |
| # "completeness": avg_completeness, | |
| # "average": average of 4 metrics, | |
| # "num_samples": 10, | |
| # "individual_scores": [scores for each sample] | |
| # } | |
| # Step 4: Display results | |
| st.metric("Utilization", f"{results['utilization']:.3f}") | |
| st.metric("Relevance", f"{results['relevance']:.3f}") | |
| st.metric("Adherence", f"{results['adherence']:.3f}") | |
| st.metric("Completeness", f"{results['completeness']:.3f}") | |
| ``` | |
| --- | |
| ## Key Classes & Methods | |
| ### **Session State Management (Streamlit)** | |
| ```python | |
| # File: streamlit_app.py - initialization | |
| # Session state stores state between reruns | |
| if "chat_history" not in st.session_state: | |
| st.session_state.chat_history = [] | |
| if "rag_pipeline" not in st.session_state: | |
| st.session_state.rag_pipeline = None | |
| if "evaluation_results" not in st.session_state: | |
| st.session_state.evaluation_results = None | |
| if "dataset_name" not in st.session_state: | |
| st.session_state.dataset_name = None | |
| # Why session state? | |
| # - Streamlit reruns entire script on every interaction | |
| # - Session state preserves data across reruns | |
| # - Without it: chat history would reset after every message | |
| ``` | |
| ### **Main UI Tabs** | |
| ```python | |
| # File: streamlit_app.py | |
| tab1, tab2, tab3 = st.tabs(["π¬ Chat", "π Evaluation", "π History"]) | |
| with tab1: | |
| chat_interface() # Conversational interface | |
| with tab2: | |
| evaluation_interface() # Run TRACE evaluation | |
| with tab3: | |
| history_interface() # View & export chat history | |
| ``` | |
| --- | |
| ## Configuration System | |
| ### **How Settings Are Used** | |
| ```python | |
| # config.py | |
| settings = Settings() # Loads from .env and defaults | |
| # Usage in other files | |
| from config import settings | |
| # In llm_client.py | |
| client = GroqLLMClient( | |
| api_key=settings.groq_api_key, | |
| max_rpm=settings.groq_rpm_limit | |
| ) | |
| # In vector_store.py | |
| vector_store = ChromaDBManager(settings.chroma_persist_directory) | |
| # In streamlit_app.py | |
| dataset_options = st.selectbox("Choose Dataset", settings.ragbench_datasets) | |
| ``` | |
| --- | |
| ## Summary: Code Architecture | |
| ``` | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| β STREAMLIT UI (streamlit_app.py) β | |
| β βββββββββββββ¬βββββββββββββββ¬βββββββββββββββ β | |
| β β Chat Tab β Eval Tab β History Tab β β | |
| β βββββββββββββ΄βββββββββββββββ΄βββββββββββββββ β | |
| ββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββ | |
| β | |
| ββββββββββββββΌβββββββββββββ | |
| β β β | |
| βΌ βΌ βΌ | |
| [RAG Pipeline] [TRACE Eval] [History] | |
| β β β | |
| ββββββββββββββΌβββββββββββββ€ | |
| β β β | |
| βββββΌβββββββββββββββββββββββ¬βββ΄βββββββββββββββββ | |
| β β β | |
| βΌ βΌ βΌ | |
| [LLM Client] [Vector Store] [Dataset Loader] | |
| ββ Rate Limiter ββ ChromaDB ββ RAGBench | |
| ββ Groq API ββ Embedding ββ Process data | |
| ββ Generation ββ Retrieval ββ Cache | |
| βββββββββββββββββββββββ¬βββββββββββββββββββ | |
| β β β | |
| βΌ βΌ βΌ | |
| [Embedding Models] [Chunking Strategies] [Config] | |
| ββ SentenceTransformer ββ Dense ββ API Keys | |
| ββ BioMedical BERT ββ Sparse ββ Models | |
| ββ Multiple options ββ Hybrid ββ Settings | |
| ββ Re-ranking ββ Paths | |
| ``` | |
| --- | |
| ## Quick Reference | |
| | Component | Purpose | Key File | | |
| |-----------|---------|----------| | |
| | **Streamlit App** | User interface | streamlit_app.py | | |
| | **RAG Pipeline** | Orchestrates query flow | llm_client.py | | |
| | **LLM Client** | Generates responses | llm_client.py | | |
| | **Vector Store** | Stores & retrieves embeddings | vector_store.py | | |
| | **Embeddings** | Converts text to vectors | embedding_models.py | | |
| | **Datasets** | Loads RAG Bench datasets | dataset_loader.py | | |
| | **Chunking** | Splits documents | chunking_strategies.py | | |
| | **Evaluation** | TRACE metrics | trace_evaluator.py | | |
| | **Config** | Settings management | config.py | | |
| --- | |
| This comprehensive guide covers the architecture, data flow, and key components of your RAG application! π | |