CapStoneRAG10 / docs /CODE_ARCHITECTURE.md
Developer
Initial commit for HuggingFace Spaces - RAG Capstone Project with Qdrant Cloud
1d10b0a

RAG Capstone Project - Code Architecture & Walkthrough

Table of Contents

  1. Project Structure
  2. Core Components
  3. Data Flow
  4. Detailed Code Walkthroughs
  5. Key Classes & Methods
  6. Configuration System

Project Structure

RAG Capstone Project/
β”œβ”€β”€ streamlit_app.py           # Main UI application
β”œβ”€β”€ api.py                     # FastAPI backend (optional)
β”œβ”€β”€ llm_client.py              # Groq LLM integration
β”œβ”€β”€ vector_store.py            # ChromaDB management
β”œβ”€β”€ dataset_loader.py          # RAGBench dataset loading
β”œβ”€β”€ embedding_models.py        # Embedding model factory
β”œβ”€β”€ chunking_strategies.py     # Document chunking
β”œβ”€β”€ trace_evaluator.py         # Evaluation metrics
β”œβ”€β”€ config.py                  # Configuration settings
β”œβ”€β”€ requirements.txt           # Dependencies
└── chroma_db/                 # Persistent vector store

Core Components

1. Configuration (config.py)

class Settings(BaseSettings):
    """Central configuration management using Pydantic."""
    
    # API Configuration
    groq_api_key: str = ""                    # Groq API key
    groq_rpm_limit: int = 30                  # Requests per minute
    rate_limit_delay: float = 2.0             # Delay between requests
    
    # Storage
    chroma_persist_directory: str = "./chroma_db"
    
    # Available Models
    embedding_models: list = [                # 8 embedding options
        "sentence-transformers/all-mpnet-base-v2",
        "emilyalsentzer/Bio_ClinicalBERT",
        "microsoft/BiomedNLP-PubMedBERT-base-uncased-abstract",
        "sentence-transformers/all-MiniLM-L6-v2",
        "sentence-transformers/multilingual-MiniLM-L12-v2",
        "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2",
        "allenai/specter",
        "gemini-embedding-001"
    ]
    
    llm_models: list = [                      # 3 LLM options
        "meta-llama/llama-4-maverick-17b-128e-instruct",
        "llama-3.1-8b-instant",
        "openai/gpt-oss-120b"
    ]
    
    chunking_strategies: list = [             # 4 chunking strategies
        "dense", "sparse", "hybrid", "re-ranking"
    ]
    
    ragbench_datasets: list = [               # 12 RAGBench datasets
        "covidqa", "cuad", "delucionqa", "emanual",
        "expertqa", "finqa", "hagrid", "hotpotqa",
        "msmarco", "pubmedqa", "tatqa", "techqa"
    ]

Key Features:

  • βœ… Pydantic validation
  • βœ… Loads from .env file
  • βœ… Centralized settings management
  • βœ… Easy to extend

2. LLM Client (llm_client.py)

A. Rate Limiter Class

class RateLimiter:
    """Prevents API rate limit violations."""
    
    def __init__(self, max_requests_per_minute: int = 30):
        self.max_requests = 30              # 30 requests/minute for Groq
        self.request_times = deque()        # Track request timestamps
    
    def acquire_sync(self):
        """
        Synchronous rate limiting:
        
        Flow:
        1. Remove requests older than 1 minute
        2. If at limit: calculate wait time
        3. Sleep for wait time
        4. Record this request
        
        Example:
        - At 00:00 make 30 requests
        - At 00:05 try 31st request
        - Wait time = 60 - 5 = 55 seconds
        """

Why needed?

  • Groq API has 30 requests/minute limit
  • Prevents rate limit errors
  • Handles multiple concurrent requests gracefully

B. GroqLLMClient Class

class GroqLLMClient:
    """Main LLM interface using Groq API."""
    
    def __init__(self, api_key: str, model_name: str, max_rpm: int = 30):
        self.client = Groq(api_key=api_key)     # Groq API client
        self.model_name = model_name             # Selected model
        self.rate_limiter = RateLimiter(max_rpm) # Rate limiting
        
    def generate(self, prompt: str, max_tokens: int = 1024) -> str:
        """
        Generate text from prompt:
        
        Execution Flow:
        1. rate_limiter.acquire_sync()  # Wait if needed
        2. self.client.chat.completions.create()  # Call Groq API
        3. time.sleep(rate_limit_delay)  # Additional delay
        4. Return response.choices[0].message.content
        
        Code:
        """
        self.rate_limiter.acquire_sync()
        
        messages = [{"role": "system", "content": system_prompt},
                    {"role": "user", "content": prompt}]
        
        response = self.client.chat.completions.create(
            model=self.model_name,
            messages=messages,
            max_tokens=max_tokens,
            temperature=0.7
        )
        
        time.sleep(self.rate_limit_delay)
        return response.choices[0].message.content

C. RAGPipeline Class

class RAGPipeline:
    """Orchestrates the complete RAG workflow."""
    
    def __init__(self, llm_client: GroqLLMClient, 
                 vector_store_manager: ChromaDBManager):
        self.llm_client = llm_client
        self.vector_store = vector_store_manager
        self.chat_history = []
    
    def query(self, query: str, n_results: int = 5) -> Dict:
        """
        Execute RAG Query:
        
        Step 1: RETRIEVAL
        ─────────────────
        retrieved_docs = vector_store.get_retrieved_documents(query, n_results=5)
        
        Step 2: CONTEXT AUGMENTATION
        ────────────────────────────
        doc_texts = [doc["document"] for doc in retrieved_docs]
        
        Step 3: GENERATION
        ──────────────────
        response = llm.generate_with_context(query, doc_texts)
        
        Step 4: HISTORY
        ───────────────
        chat_history.append({"query": query, "response": response})
        
        Return: {
            "query": "What is AI?",
            "response": "Generated answer...",
            "retrieved_documents": [
                {
                    "document": "AI is...",
                    "distance": 0.123,
                    "metadata": {...}
                },
                ...
            ]
        }
        """

3. Embedding Models (embedding_models.py)

Model Types

class EmbeddingModel:
    """Base class for all embedding models."""
    
    def embed_documents(self, texts: List[str]) -> np.ndarray:
        """Convert texts to vectors (embeddings)."""
        raise NotImplementedError
    
    def embed_query(self, query: str) -> np.ndarray:
        """Convert query to vector."""
        return self.embed_documents([query])[0]


class SentenceTransformerEmbedding(EmbeddingModel):
    """Uses pre-trained transformer models from HuggingFace."""
    
    def load_model(self):
        """
        Load SentenceTransformer:
        
        What it does:
        1. Downloads model from HuggingFace
        2. Loads to GPU (if available) or CPU
        3. Sets to eval mode (no dropout)
        
        Example:
        model = SentenceTransformer("all-mpnet-base-v2")
        """
        self.model = SentenceTransformer(self.model_name, device=self.device)
    
    def embed_documents(self, texts: List[str], batch_size: int = 32) -> np.ndarray:
        """
        Batch embed documents:
        
        Process:
        1. Split texts into batches (32 texts at a time)
        2. For each batch: self.model.encode(batch)
        3. Stack all embeddings
        4. Return numpy array
        
        Efficiency: Batching prevents memory overflow
        """
        embeddings = []
        for i in range(0, len(texts), batch_size):
            batch = texts[i:i+batch_size]
            embeddings.append(self.model.encode(batch, convert_to_numpy=True))
        return np.vstack(embeddings)

Factory Pattern

class EmbeddingFactory:
    """Creates appropriate embedding model."""
    
    @staticmethod
    def create_embedding_model(model_name: str) -> EmbeddingModel:
        """
        Automatically select model type:
        
        Logic:
        - If "Bio" or "Biomed" in name β†’ BioMedicalEmbedding
        - If "specter" β†’ SentenceTransformerEmbedding
        - Otherwise β†’ SentenceTransformerEmbedding (default)
        
        Usage:
        model = EmbeddingFactory.create_embedding_model("all-mpnet-base-v2")
        embeddings = model.embed_documents(["text1", "text2"])
        """

4. Vector Store (vector_store.py)

ChromaDBManager Class

class ChromaDBManager:
    """Manages ChromaDB vector database."""
    
    def __init__(self, persist_directory: str = "./chroma_db"):
        """
        Initialize persistent vector database:
        
        Key: PersistentClient ensures data survives app restarts
        
        Features:
        1. Create/use directory: ./chroma_db
        2. Initialize PersistentClient (not ephemeral)
        3. Enable telemetry anonymization
        4. Fallback to regular Client if needed
        """
        self.client = chromadb.PersistentClient(
            path=persist_directory,
            settings=Settings(anonymized_telemetry=False)
        )
        self.current_collection = None
        self.embedding_model = None
    
    def create_collection(self, collection_name: str, 
                         embedding_model_name: str) -> chromadb.Collection:
        """
        Create new vector collection:
        
        Process:
        1. Delete if exists (avoid conflicts)
        2. Load embedding model
        3. Create ChromaDB collection with metadata
        4. Store reference in self.current_collection
        
        Collection Structure:
        {
            "name": "wiki_qa_dense_all_mpnet",
            "metadata": {
                "embedding_model": "all-mpnet-base-v2",
                "hnsw:space": "cosine"  # Similarity metric
            },
            "documents": [...],          # Document texts
            "embeddings": [...],         # Vector embeddings
            "metadatas": [...]           # Document metadata
        }
        """
    
    def add_documents(self, documents: List[str], 
                     metadatas: Optional[List[Dict]] = None):
        """
        Add documents to collection:
        
        Steps:
        1. Generate IDs if not provided: uuid.uuid4()
        2. Generate default metadata if not provided
        3. Process in batches (prevents memory issues)
        4. Embed each batch: self.embedding_model.embed_documents()
        5. Add to collection: self.current_collection.add()
        
        Code Flow:
        for batch in batches(documents, batch_size=100):
            embeddings = self.embedding_model.embed_documents(batch)
            self.current_collection.add(
                ids=ids,
                embeddings=embeddings,
                documents=batch,
                metadatas=metadatas
            )
        """
    
    def get_retrieved_documents(self, query: str, 
                               n_results: int = 5) -> List[Dict]:
        """
        Retrieve similar documents:
        
        Retrieval Process (using HNSW):
        1. Embed query: embedding = embed_model.embed_query(query)
        2. Query collection: results = collection.query(
               query_embeddings=[embedding],
               n_results=5,
               include=["documents", "metadatas", "distances"]
           )
        3. Format results and return
        
        Return Format:
        [
            {
                "document": "Document text...",
                "distance": 0.123,  # Lower = more similar
                "metadata": {...}
            },
            ...
        ]
        """

5. Dataset Loader (dataset_loader.py)

class RAGBenchLoader:
    """Loads datasets from Hugging Face rungalileo/ragbench."""
    
    SUPPORTED_DATASETS = [
        'covidqa', 'cuad', 'delucionqa', 'emanual',
        'expertqa', 'finqa', 'hagrid', 'hotpotqa',
        'msmarco', 'pubmedqa', 'tatqa', 'techqa'
    ]
    
    def load_dataset(self, dataset_name: str, split: str = "train",
                    max_samples: Optional[int] = None) -> List[Dict]:
        """
        Load RAGBench dataset:
        
        Process:
        1. Validate dataset name
        2. Load from HuggingFace: load_dataset("rungalileo/ragbench", dataset_name)
        3. Select max_samples if specified
        4. Process each item: _process_ragbench_item()
        5. Return list of standardized dicts
        
        Result Format:
        [
            {
                "question": "What is X?",
                "answer": "X is...",
                "documents": ["doc1", "doc2", ...],
                "context": "combined document text",
                "dataset": "wiki_qa"
            },
            ...
        ]
        
        Caching:
        - First load downloads ~100MB per dataset
        - Subsequent loads use cache
        - Cache location: ./data_cache/
        """
    
    def get_test_data_size(self, dataset_name: str) -> int:
        """
        Get available test samples without loading full dataset:
        
        Efficient Approach:
        1. builder = load_dataset_builder("rungalileo/ragbench", dataset_name)
        2. Check splits: builder.info.splits
        3. Return: builder.info.splits['test'].num_examples
        
        Benefit: Fast metadata access (~1 second)
        vs. Full load (~30 seconds)
        
        Fallback:
        - If builder fails: load_dataset() and return len(ds)
        - If error: return 100 (reasonable default)
        """

Data Flow

Complete RAG Query Flow

User Types Question in Chat
        ↓
streamlit_app.py receives input
        ↓
RAGPipeline.query(question)
        β”œβ”€ STEP 1: RETRIEVAL
        β”‚   β”œβ”€ embedding_model.embed_query(question)
        β”‚   β”‚   └─ "What is AI?" β†’ [0.1, 0.2, 0.3, ...]
        β”‚   β”‚
        β”‚   └─ vector_store.get_retrieved_documents(query_embedding, n_results=5)
        β”‚       └─ Search in ChromaDB collection
        β”‚       └─ Return top 5 similar documents
        β”‚
        β”œβ”€ STEP 2: CONTEXT PREPARATION
        β”‚   └─ Extract text from retrieved documents
        β”‚
        β”œβ”€ STEP 3: GENERATION
        β”‚   └─ llm_client.generate_with_context(question, doc_texts)
        β”‚       β”œβ”€ Rate limiter checks (wait if needed)
        β”‚       β”œβ”€ Send to Groq API:
        β”‚       β”‚   "Use context to answer: [docs...] Question: [q...]"
        β”‚       └─ Return generated response
        β”‚
        └─ STEP 4: RETURN RESULT
            └─ {"query": q, "response": r, "retrieved_documents": docs}
        ↓
Display in Streamlit UI

Collection Creation Flow

User configures in sidebar:
β”œβ”€ Dataset: wiki_qa
β”œβ”€ Embedding: all-mpnet-base-v2
β”œβ”€ Chunking: dense
└─ LLM: llama-3.1-8b
        ↓
Click "Load Data & Create Collection"
        ↓
dataset_loader.load_dataset("wiki_qa", split="train", max_samples=100)
β”œβ”€ Downloads dataset from HuggingFace
β”œβ”€ Processes 100 samples
└─ Returns list of {"question", "answer", "documents", ...}
        ↓
chunking_strategy.chunk(documents, chunk_size=512, overlap=50)
β”œβ”€ Split large docs into 512-token chunks
β”œβ”€ Maintain 50-token overlap for context
└─ Returns list of chunks
        ↓
vector_store.load_dataset_into_collection()
β”œβ”€ Create collection: "wiki_qa_dense_all_mpnet"
β”œβ”€ For each chunk:
β”‚   β”œβ”€ embedding_model.embed(chunk)
β”‚   β”œβ”€ Generate UUID
β”‚   └─ Store in ChromaDB
└─ Persist to ./chroma_db/ on disk
        ↓
Store references in Streamlit session state
        ↓
Ready for chat & evaluation!

Detailed Code Walkthroughs

Walkthrough 1: User Chats with RAG System

# File: streamlit_app.py - chat_interface()

# Step 1: User types query
query = st.chat_input("Ask a question...")

if query:
    # Step 2: Display user message
    with st.chat_message("user"):
        st.write(query)
    
    # Step 3: Call RAG pipeline
    result = st.session_state.rag_pipeline.query(
        query,
        n_results=5  # Retrieve top 5 docs
    )
    # Inside RAGPipeline.query():
    #   - retrieved_docs = vector_store.get_retrieved_documents()
    #   - doc_texts = extract texts
    #   - response = llm.generate_with_context(query, doc_texts)
    #   - Store in chat_history
    #   - Return {"query", "response", "retrieved_documents"}
    
    # Step 4: Display response
    with st.chat_message("assistant"):
        st.write(result["response"])
    
    # Step 5: Show retrieved documents
    with st.expander("πŸ“„ Retrieved Documents"):
        for i, doc in enumerate(result["retrieved_documents"]):
            st.markdown(f"**Doc {i+1}** - Distance: {doc['distance']:.4f}")
            st.text_area("", value=doc["document"], height=100)
    
    # Step 6: Store in session history
    st.session_state.chat_history.append(result)
    st.rerun()

Walkthrough 2: Run Evaluation

# File: streamlit_app.py - run_evaluation()

# Step 1: Get test data
loader = RAGBenchLoader()
test_data = loader.get_test_data("wiki_qa", num_samples=10)
# Returns: [{"question": "Q1", "answer": "A1"}, ...]

# Step 2: Prepare test cases
test_cases = []
for sample in test_data:
    # Query RAG system
    result = rag_pipeline.query(sample["question"], n_results=5)
    
    # Create test case
    test_case = {
        "query": sample["question"],
        "response": result["response"],
        "retrieved_documents": [doc["document"] for doc in result["retrieved_documents"]],
        "ground_truth": sample.get("answer", "")
    }
    test_cases.append(test_case)

# Step 3: Run TRACE evaluation
evaluator = TRACEEvaluator()
results = evaluator.evaluate_batch(test_cases)

# Inside evaluate_batch():
#   for test_case in test_cases:
#       scores = evaluate(query, response, docs, ground_truth)
#       all_scores.append(scores)
#   
#   avg_utilization = mean([s.utilization for s in all_scores])
#   avg_relevance = mean([s.relevance for s in all_scores])
#   avg_adherence = mean([s.adherence for s in all_scores])
#   avg_completeness = mean([s.completeness for s in all_scores])
#   
#   return {
#       "utilization": avg_utilization,
#       "relevance": avg_relevance,
#       "adherence": avg_adherence,
#       "completeness": avg_completeness,
#       "average": average of 4 metrics,
#       "num_samples": 10,
#       "individual_scores": [scores for each sample]
#   }

# Step 4: Display results
st.metric("Utilization", f"{results['utilization']:.3f}")
st.metric("Relevance", f"{results['relevance']:.3f}")
st.metric("Adherence", f"{results['adherence']:.3f}")
st.metric("Completeness", f"{results['completeness']:.3f}")

Key Classes & Methods

Session State Management (Streamlit)

# File: streamlit_app.py - initialization

# Session state stores state between reruns
if "chat_history" not in st.session_state:
    st.session_state.chat_history = []

if "rag_pipeline" not in st.session_state:
    st.session_state.rag_pipeline = None

if "evaluation_results" not in st.session_state:
    st.session_state.evaluation_results = None

if "dataset_name" not in st.session_state:
    st.session_state.dataset_name = None

# Why session state?
# - Streamlit reruns entire script on every interaction
# - Session state preserves data across reruns
# - Without it: chat history would reset after every message

Main UI Tabs

# File: streamlit_app.py

tab1, tab2, tab3 = st.tabs(["πŸ’¬ Chat", "πŸ“Š Evaluation", "πŸ“œ History"])

with tab1:
    chat_interface()          # Conversational interface
    
with tab2:
    evaluation_interface()    # Run TRACE evaluation

with tab3:
    history_interface()       # View & export chat history

Configuration System

How Settings Are Used

# config.py
settings = Settings()  # Loads from .env and defaults

# Usage in other files
from config import settings

# In llm_client.py
client = GroqLLMClient(
    api_key=settings.groq_api_key,
    max_rpm=settings.groq_rpm_limit
)

# In vector_store.py
vector_store = ChromaDBManager(settings.chroma_persist_directory)

# In streamlit_app.py
dataset_options = st.selectbox("Choose Dataset", settings.ragbench_datasets)

Summary: Code Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚         STREAMLIT UI (streamlit_app.py)                β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”           β”‚
β”‚  β”‚ Chat Tab  β”‚ Eval Tab     β”‚ History Tab  β”‚           β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜           β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                     β”‚
        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
        β”‚            β”‚            β”‚
        β–Ό            β–Ό            β–Ό
    [RAG Pipeline] [TRACE Eval] [History]
        β”‚            β”‚            β”‚
        β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
        β”‚            β”‚            β”‚
    β”Œβ”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚                          β”‚                   β”‚
    β–Ό                          β–Ό                   β–Ό
[LLM Client]          [Vector Store]       [Dataset Loader]
  β”œβ”€ Rate Limiter       β”œβ”€ ChromaDB           β”œβ”€ RAGBench
  β”œβ”€ Groq API          β”œβ”€ Embedding          β”œβ”€ Process data
  └─ Generation        β”œβ”€ Retrieval          └─ Cache

      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
      β”‚                     β”‚                  β”‚
      β–Ό                     β–Ό                  β–Ό
  [Embedding Models] [Chunking Strategies] [Config]
   β”œβ”€ SentenceTransformer  β”œβ”€ Dense       β”œβ”€ API Keys
   β”œβ”€ BioMedical BERT      β”œβ”€ Sparse      β”œβ”€ Models
   └─ Multiple options     β”œβ”€ Hybrid      β”œβ”€ Settings
                           └─ Re-ranking  └─ Paths

Quick Reference

Component Purpose Key File
Streamlit App User interface streamlit_app.py
RAG Pipeline Orchestrates query flow llm_client.py
LLM Client Generates responses llm_client.py
Vector Store Stores & retrieves embeddings vector_store.py
Embeddings Converts text to vectors embedding_models.py
Datasets Loads RAG Bench datasets dataset_loader.py
Chunking Splits documents chunking_strategies.py
Evaluation TRACE metrics trace_evaluator.py
Config Settings management config.py

This comprehensive guide covers the architecture, data flow, and key components of your RAG application! πŸš€