# Week 6: RAG Pipeline & Parallel Agent Execution — Detailed Documentation > **Goal:** Give agents "peripheral vision" via RAG (Retrieval-Augmented Generation) and run all three agents concurrently with `asyncio.gather()`. > **Status:** Complete — Live-tested on PR #4 with RAG context and 3 parallel agents > **Date:** 2026-03-20 > **Test PR:** github.com/ninjacode911/codeguard-test/pull/4 > **Result:** RAG indexed 1 chunk, retrieved context, 3 agents ran in parallel in ~7 seconds (after model load) --- ## What We Built Week 6 adds two capabilities that transform Ninja Code Guard from a "look at the diff and guess" system into one that **understands the surrounding codebase** and **runs efficiently at scale**. 1. **RAG Pipeline** — Embeds repository source code into a vector database (ChromaDB), then retrieves semantically relevant code chunks and injects them into each agent's LLM prompt. This gives agents evidence about code they can't see in the diff alone. 2. **Parallel Agent Execution** — All three domain agents (Security, Performance, Style) now run concurrently via `asyncio.gather()`, reducing total review latency from the SUM of agent times to the MAX of agent times. ``` PR Webhook Received | v +--------------------------+ | 1. Fetch PR Data | GitHub API: diff + file contents | (Week 2) | +------------+-------------+ | v +--------------------------+ | 2. RAG: Index Files | NEW in Week 6 | chunk --> embed --> | sentence-transformers | store in ChromaDB | all-MiniLM-L6-v2 (384 dims) +------------+-------------+ | v +--------------------------+ | 3. RAG: Retrieve | NEW in Week 6 | embed query --> search | top-K nearest neighbors | --> filter by 0.3 | L2 distance to similarity +------------+-------------+ | rag_context string v +--------------------+--------------------+ | | | v v v +------------------+ +------------------+ +------------------+ | Security Agent | | Performance | | Style Agent | | (Bandit + | | Agent (Radon) | | (Ruff) | | detect-secrets) | | | | | | | | | | | | rag_context | | rag_context | | rag_context | | injected into | | injected into | | injected into | | prompt | | prompt | | prompt | +--------+---------+ +--------+---------+ +--------+---------+ | | | | asyncio.gather() -- all 3 run concurrently | | | v v v +----------------------------------------------------------+ | Merge Findings | | security_findings + performance_findings + style_findings | | Health Score = 100 - (critical*25) - (high*10) - ... | +----------------------------+-------------------------------+ | v +--------------------------+ | Post to GitHub | Inline comments + summary | Cache in Redis | +--------------------------+ ``` --- ## Concept 1: What is RAG (Retrieval-Augmented Generation)? ### The Problem: Diffs Are Not Enough When a developer opens a PR, the diff shows what CHANGED. But understanding whether a change is correct, safe, or performant often requires seeing code that DIDN'T change: ``` The PR adds this line: + result = db.execute(query, params) Questions the agent should ask: 1. What is db.execute()? Is it an ORM that parameterizes inputs, or raw SQL? --> Need to see the DB wrapper class (in another file) 2. Where does `query` come from? Is it user-controlled? --> Need to see the caller functions (in other files) 3. Are there other places in the codebase doing the same thing? --> Need semantic search across the entire repo 4. Is there middleware that validates the input before it reaches here? --> Need to see the request handling pipeline ``` Without RAG, the agent has to GUESS the answers to these questions. With RAG, the agent has EVIDENCE — actual code from the repository that it can reason about. ### The RAG Pipeline: Step by Step RAG has two phases: **indexing** (prepare the knowledge base) and **retrieval** (query it). ``` +------------------------------------------------------------------+ | INDEXING PHASE (once per PR review) | | | | Source Files ---> chunk_code() ---> embed_texts() ---> ChromaDB | | (from GitHub) 60-line chunks sentence-transformers upsert | | 10-line overlap all-MiniLM-L6-v2 | | 384-dimensional vectors | +------------------------------------------------------------------+ +------------------------------------------------------------------+ | RETRIEVAL PHASE (once per PR review) | | | | PR Diff ----> embed_texts() ----> ChromaDB query ----> Top-K | | (query) same model nearest neighbor formatted | | similarity search as context | | for LLM | +------------------------------------------------------------------+ ``` **In plain English:** We take all the files in the PR, chop them into small pieces, convert each piece into a list of numbers (a "vector") that captures its meaning, and store those vectors in a database. Then we take the PR diff, convert IT into a vector, and ask the database: "which code pieces are most similar to this diff?" The database returns the most relevant pieces, which we paste into the LLM's prompt alongside the diff. **Interview talking point:** "RAG gives our agents 'peripheral vision' — they see not just the changed lines, but semantically related code from across the repository. When a PR modifies a database query, RAG retrieves the DB wrapper class, validation middleware, and similar query patterns from other files. This dramatically reduces false positives because the agent can verify whether input is already sanitized elsewhere, rather than guessing." --- ## Concept 2: Embeddings — Turning Code Into Numbers ### What Is an Embedding? An embedding is a fixed-size list of numbers (a "vector") that captures the MEANING of a piece of text. Two pieces of text with similar meaning will have vectors that are close together in vector space, even if they use completely different words. ``` "connect to database" --> [0.23, -0.15, 0.87, 0.04, ...] --+ +-- Close together "establish DB connection" --> [0.21, -0.18, 0.85, 0.06, ...] --+ (high similarity) "print hello world" --> [-0.45, 0.72, -0.12, 0.33, ...] --- Far away (low similarity) ``` **How this differs from keyword search:** A keyword search for "database connection" would NOT match a code chunk containing `conn = sqlite3.connect("users.db")` — the words don't match. But embedding similarity WOULD match them, because the model understands that `sqlite3.connect` is semantically related to "database connection." ### Why all-MiniLM-L6-v2? We chose the `all-MiniLM-L6-v2` model from the sentence-transformers library. Here is why: | Property | Value | Why It Matters | |----------|-------|----------------| | Parameters | 22M | Small enough to run on CPU in production (Render free tier has no GPU) | | Dimensions | 384 | Good balance: enough dimensions to capture nuance, small enough for fast search. 768 or 1536 dims would be more precise but use more memory and slower retrieval | | Speed | ~10ms/chunk on CPU | Fast enough for real-time indexing during webhook processing. At 200 chunks, that's 2 seconds total | | Training data | Semantic textual similarity | Optimized for "do these texts mean the same thing?" — exactly what we need for finding related code | | Cost | Free, runs locally | No API calls, no rate limits, no vendor lock-in. Runs entirely in our Render process | | Download size | ~90 MB | Small enough that even cold-start download is manageable (though it takes ~56 seconds — see Bug section) | **Why not OpenAI's text-embedding-3-small or Cohere?** Those are arguably better at natural language, but they cost money per API call and add network latency. For code similarity — where the signal is in structure, function names, and identifiers rather than prose — MiniLM is good enough. The speed and cost advantage of running locally is significant when you're embedding 200 chunks per PR review. ### Shannon Entropy vs. Semantic Similarity These are two different ways to measure "interestingness" of a string: **Shannon entropy** (used by detect-secrets in Week 3) measures RANDOMNESS: - `"hello"` has entropy ~2.8 bits/char — predictable, not a secret - `"a3f8Kx9m2Q"` has entropy ~3.9 bits/char — random, probably a secret - It answers: "How unpredictable is this string?" — useful for finding API keys **Semantic similarity** (used by embeddings in Week 6) measures MEANING: - `"connect to database"` and `"establish DB connection"` have high similarity - It answers: "Do these texts mean the same thing?" — useful for finding related code They solve completely different problems. Entropy is a statistical measure of randomness. Similarity is a learned measure of semantic relatedness. **Interview talking point:** "We use Shannon entropy in detect-secrets to find API keys (high-entropy strings are likely secrets) and semantic embeddings in RAG to find related code (semantically similar chunks are likely relevant context). These are complementary techniques — entropy operates on individual strings, embeddings operate on meaning across entire code blocks." --- ## Concept 3: Code Chunking Strategy ### Why We Chunk The embedding model has a maximum input length (~256 tokens for MiniLM), and even within that limit, shorter inputs produce better embeddings. A 500-line file would produce a diluted embedding that weakly matches many topics. A 60-line function produces a focused embedding that strongly matches its specific topic. ### The chunk_code() Function — Walkthrough ```python def chunk_code(content: str, filepath: str, chunk_size: int = 60) -> list[dict]: """ Split source code into overlapping chunks for embedding. """ lines = content.split("\n") chunks = [] overlap = 10 # Lines shared between adjacent chunks start = 0 while start < len(lines): end = min(start + chunk_size, len(lines)) chunk_text = "\n".join(lines[start:end]) # Skip very small chunks (less than 5 non-empty lines) # WHY: A chunk of blank lines and comments has no semantic # content worth embedding. It would waste storage and produce # misleading similarity matches. non_empty = sum(1 for line in lines[start:end] if line.strip()) if non_empty >= 5: chunks.append({ "text": f"# File: {filepath}\n{chunk_text}", # ^^^^^^^^^^^^^^^^^ # Filepath prepended so the embedding model # "sees" the file path as part of the content. # A query about "database" will match chunks in # db/connection.py partly because of the filepath. "filepath": filepath, "start_line": start + 1, # 1-indexed for human readability "end_line": end, }) start += chunk_size - overlap # Move forward, but keep 10 lines of overlap return chunks ``` ### Why 60 Lines Per Chunk? This is the Goldilocks zone for code: ``` Too small (10 lines): def get_user(user_id): <-- Just the signature conn = sqlite3.connect( <-- No context about what happens next ... PROBLEM: Loses context. A function signature without its body is useless for understanding behavior. Too large (200 lines): class UserService: <-- Database logic def get_user(...): ... def update_user(...): ... <-- Authentication logic def delete_user(...): ... <-- Logging logic def validate(...): ... ... PROBLEM: Dilutes the embedding signal. A 200-line chunk about "database queries AND logging AND error handling" will weakly match all three topics instead of strongly matching one. Just right (60 lines = ~one function/class): def get_user(user_id): conn = sqlite3.connect("users.db") query = "SELECT * FROM users WHERE id = ?" return conn.execute(query, (user_id,)).fetchone() ... GOOD: Captures a single concept well. The embedding strongly represents "database query for user lookup" and will match queries about DB access. ``` ### Why 10 Lines of Overlap? Without overlap, a function that spans lines 55-70 would be split across two chunks: ``` Without overlap: With 10-line overlap: Chunk 1: lines 1-60 Chunk 1: lines 1-60 Chunk 2: lines 61-120 Chunk 2: lines 51-110 ^^^^^^^^ overlap zone (lines 51-60) Function at lines 55-70: Function at lines 55-70: Chunk 1 has lines 55-60 Chunk 1 has lines 55-60 (partial) Chunk 2 has lines 61-70 Chunk 2 has lines 51-70 (COMPLETE!) NEITHER chunk has the Chunk 2 has the full function. complete function! ``` **The trade-off:** Overlap means ~17% more chunks (and therefore ~17% more embedding computation and storage). For a 200-chunk file, that is 34 extra chunks — a worthwhile trade for context integrity. ### Why Skip Chunks with <5 Non-Empty Lines? A chunk that is mostly blank lines, comments, or whitespace has no meaningful semantic content. Embedding it would: 1. Waste ChromaDB storage space 2. Produce misleading similarity matches (blank chunks might match other blank chunks) 3. Add noise to the retrieval results The threshold of 5 is deliberately conservative — even a short function like `def add(a, b): return a + b` with some surrounding context will pass. **Interview talking point:** "Our chunking strategy uses 60-line windows with 10-line overlap, tuned for the natural granularity of source code — roughly one function or class per chunk. The overlap ensures functions spanning chunk boundaries remain complete in at least one chunk. We skip near-empty chunks to avoid polluting the vector store with semantically meaningless content. The filepath is prepended to each chunk so the embedding model can use it as a semantic signal — queries about 'database' will naturally match chunks from files in the db/ directory." --- ## Concept 4: ChromaDB — Embedded Vector Database ### What ChromaDB Is ChromaDB is an open-source **vector database** that stores embeddings alongside the original documents and metadata. Unlike Postgres or Redis (which store rows or key-value pairs), ChromaDB is optimized for **similarity search** — "find the 5 stored items most similar to this query." The key differentiator: ChromaDB runs **embedded in the Python process**. No separate server, no Docker container, no network calls, no infrastructure to manage. You `pip install chromadb` and call `chromadb.Client()`. ### Why In-Memory Mode? We use `chromadb.Client()` (in-memory, no persistence) instead of `chromadb.PersistentClient(path="./data")` because Render's free tier has **ephemeral storage** — files on disk are lost whenever the service restarts. This means the vector index is rebuilt on every PR review. Is that acceptable? ``` Indexing cost per PR review: Typical PR: 5-20 changed files, 50-200 code chunks Embedding time: ~10ms per chunk x 200 chunks = ~2 seconds ChromaDB upsert time: ~100ms total Total indexing overhead: ~2 seconds Verdict: Acceptable. The LLM calls take 3-7 seconds each. 2 seconds of indexing is a small fraction of total review time. ``` In a production system with persistent storage (paid Render tier, AWS ECS, etc.), you would use `PersistentClient` so the index survives restarts and only needs incremental updates. ### Collection-Per-Repo Pattern Each GitHub repository gets its own ChromaDB collection. This provides natural isolation — code from `repo-A` doesn't contaminate retrieval results for `repo-B`. ```python def _collection_name(repo_full_name: str) -> str: """Generate a valid ChromaDB collection name from a repo name. ChromaDB collection names must be: - 3-63 characters long - Alphanumeric + underscores only (no slashes, no hyphens) GitHub repo names like "ninjacode911/code-guard-test" violate both rules. """ # "ninjacode911/code-guard-test" --> "repo_ninjacode911_code_guard_test" name = repo_full_name.replace("/", "_").replace("-", "_") return f"repo_{name}"[:63] # Enforce max length with slice ``` This sanitizer was born from Bug #3 (see Bugs section) — ChromaDB silently rejected invalid names with an opaque error message that took an hour to debug. ### Upsert for Idempotent Indexing We use `collection.upsert()` instead of `collection.add()`. The difference: | Operation | If ID exists | If ID doesn't exist | |-----------|-------------|---------------------| | `add()` | Raises an error (duplicate) | Inserts new document | | `upsert()` | Updates the existing document | Inserts new document | **Why this matters:** When a developer pushes a fix to the same PR, we re-review it. The same files get indexed again. With `upsert`, re-indexing just overwrites the old vectors instead of creating duplicates or crashing. The ID format `filepath:start_line` (e.g., `"app.py:1"`, `"app.py:51"`) ensures each chunk position is unique within a collection. ```python # Upsert into ChromaDB ids = [f"{chunk['filepath']}:{chunk['start_line']}" for chunk in all_chunks] # Examples: ["app.py:1", "app.py:51", "utils.py:1", "utils.py:51"] collection.upsert( ids=ids, # Unique ID per chunk embeddings=embeddings, # 384-dimensional vectors documents=texts, # Original code text (for returning in results) metadatas=metadatas, # filepath, start_line, end_line (for display) ) ``` ### Why ChromaDB Over Alternatives? | Vector DB | Pros | Cons | Our Choice | |-----------|------|------|------------| | **ChromaDB** | Embedded (no server), Python-native, simple API | Limited scale (~1M vectors) | **Yes** — simplicity wins for MVP | | Pinecone | Managed, scalable, fast | Requires API key, costs money, vendor lock-in | No | | pgvector | Uses existing Postgres | Requires DB setup, slower queries | Maybe later for production | | FAISS | Facebook's library, very fast | No metadata storage, manual management | No — too low-level | | Weaviate | Full-featured, GraphQL API | Heavy, requires Docker or cloud | No — overkill | **Interview talking point:** "We use ChromaDB in embedded mode — it runs inside the Python process with zero infrastructure. The trade-off is in-memory only storage on Render's free tier, so we rebuild the index on each review. This is acceptable because indexing 10-20 files takes under 2 seconds. Each repo gets its own collection identified by a sanitized version of the GitHub repo name, and we use upsert semantics to handle re-indexing gracefully without duplicates." --- ## Concept 5: Retrieval — Finding Relevant Code ### How Similarity Search Works When we embed the PR diff and query ChromaDB, the database performs **approximate nearest neighbor (ANN) search**. In simplified terms: ``` Step 1: Embed the query (PR diff) "def get_user(user_id):\n query = f'SELECT...'" | v embed_texts() --> [0.34, -0.21, 0.76, ...] (384 numbers) Step 2: Compare against all stored vectors Stored chunk 1 (db/connection.py): distance = 0.42 (close!) Stored chunk 2 (auth/middleware.py): distance = 0.87 (somewhat close) Stored chunk 3 (utils/logging.py): distance = 1.95 (far away) Stored chunk 4 (db/models.py): distance = 0.55 (close) Stored chunk 5 (tests/test_app.py): distance = 2.31 (very far) Step 3: Return top-K by distance (K=5) Result: [chunk1, chunk2, chunk4, ...] sorted by relevance ``` ### L2 Distance to Similarity Conversion ChromaDB uses **L2 (Euclidean) distance** by default. Lower distance = more similar. But humans think in terms of "similarity" (higher = more similar), so we convert: ```python # ChromaDB returns L2 distance -- lower = more similar # Convert to 0-1 similarity score -- higher = more similar similarity = max(0, 1 - distance / 2) ``` **Why `distance / 2`?** For normalized embeddings (which MiniLM produces), L2 distance ranges from 0 (identical) to 2 (maximally different). Dividing by 2 normalizes to 0-1, then subtracting from 1 inverts the scale so 1 = identical and 0 = unrelated. ### Why Filter by Similarity Threshold (0.3)? Without filtering, ChromaDB ALWAYS returns top-K results — even if they're completely irrelevant. In a small collection with only 3 chunks, ALL three will be returned even if none are related to the query. ```python if similarity < 0.3: continue # Skip low-relevance results ``` **Why 0.3?** This threshold was chosen empirically: - **Above 0.7:** Very high confidence — the chunk is clearly about the same topic - **0.3 to 0.7:** Moderate relevance — may contain useful context - **Below 0.3:** Likely noise — including it would confuse the LLM more than help it Setting it too high (0.7) would miss useful-but-not-exact matches. Setting it too low (0.1) would include irrelevant code that wastes LLM context tokens and might cause hallucinations. ### The Query Cap: 5000 Characters ```python query_embeddings = embed_texts([query_text[:5000]]) # Cap query size ``` **Why cap the query?** The PR diff for a 100-file refactoring could be 50,000 characters. Embedding all of it would: 1. Dilute the semantic signal — a query about "everything" matches nothing well 2. Exceed the embedding model's effective context window 3. Be slow (embedding time scales with input length) The first 5000 characters typically capture the most important changes (the primary files modified, the core logic changes). Later changes are often test updates, import fixes, or boilerplate that don't help with retrieval. ### How Retrieved Context Is Formatted The retriever formats results as Markdown that the LLM can parse: ``` ## Related Code Context (from repository) ### app/db/connection.py (lines 1-60, relevance: 78%) ``` class DatabaseConnection: def execute(self, query, params=None): return self.cursor.execute(query, params) ``` ### app/middleware/auth.py (lines 20-80, relevance: 65%) ``` def validate_user_id(user_id): if not isinstance(user_id, int): raise ValueError("Invalid user ID") ``` ``` Each chunk includes: - **Filepath** — so the LLM knows WHERE the code lives - **Line range** — so the LLM can reference it precisely - **Relevance score** — so the LLM can weight high-relevance chunks more **Interview talking point:** "The retriever converts L2 distance to a similarity score and filters below 0.3 to prevent noise. We cap the query at 5000 characters because embedding the entire diff of a 100-file PR would dilute the semantic signal. Retrieved context is formatted with filepath and relevance scores so the LLM can weight its relevance appropriately." --- ## Concept 6: asyncio.gather() — Parallel Agent Execution ### Why Parallel Execution Matters: The Latency Math Each agent makes an HTTP call to Groq's API and waits for the response. If we run them sequentially, we wait for each one to finish before starting the next: ``` Sequential execution (BEFORE): Security Agent: ################ 5.2s Performance Agent: ################ 4.8s Style Agent: ################ 3.5s Total: ================================================ 13.5s SUM of all three Parallel execution (AFTER): Security Agent: ################ 5.2s Performance Agent: ################ 4.8s <-- running simultaneously Style Agent: ################ 3.5s <-- running simultaneously Total: ================ 5.2s MAX of the three Speedup: 13.5s --> 5.2s = 2.6x faster ``` This is not a marginal improvement. For the developer waiting for the review, 13.5 seconds feels annoyingly slow. 5.2 seconds feels responsive. ### How Python Async Works: Event Loop and Coroutines `asyncio` is NOT multithreading. It runs on a **single thread** using **cooperative multitasking**. The key insight: our agents spend 95% of their time WAITING for Groq's HTTP response. During that wait, the CPU is idle. asyncio uses that idle time to run other tasks. ``` How asyncio.gather() executes 3 agent reviews: Time Event Loop Activity ---- --------------------------------------------------- 0ms Start Security Agent --> sends HTTP request to Groq 1ms CPU is FREE (waiting for network) --> start Performance Agent 2ms CPU is FREE --> start Style Agent 3ms All 3 HTTP requests are "in flight" simultaneously ... ... (waiting for Groq to respond -- CPU is idle) ... 3500ms Groq responds to Style Agent --> resume, process result 4800ms Groq responds to Performance Agent --> resume, process result 5200ms Groq responds to Security Agent --> resume, process result 5200ms asyncio.gather() returns all 3 results ``` **Important:** This works because the bottleneck is **network I/O** (waiting for Groq), not **CPU computation**. While waiting for a network response, the CPU has nothing to do — asyncio fills that idle time with other coroutines. ### Coroutines vs. Threads vs. Processes | Approach | Overhead | Best For | Python Limitation | |----------|----------|----------|-------------------| | `asyncio` (coroutines) | Minimal (~few KB per task) | I/O-bound work (HTTP calls, DB queries) | Single thread, cooperative | | `threading` | ~8 MB per thread stack | I/O-bound work with blocking libraries | GIL prevents true CPU parallelism | | `multiprocessing` | Full process (~30 MB+) | CPU-bound work (ML inference, math) | IPC overhead, no shared memory | **Why asyncio for our agents:** Each agent is I/O-bound (waiting for Groq's API). asyncio has near-zero overhead per coroutine and avoids the GIL contention that threads suffer from. If we were doing CPU-intensive work (like running the embedding model on 1000 chunks), we would use multiprocessing instead. ### The gather() + Graceful Degradation Pattern Here is the actual code in `main.py`: ```python # Create all three agents security_agent = SecurityAgent() performance_agent = PerformanceAgent() style_agent = StyleAgent() # Run all three concurrently -- total time = max(agent times), not sum security_findings, performance_findings, style_findings = await asyncio.gather( security_agent.review(pr_data, rag_context), performance_agent.review(pr_data, rag_context), style_agent.review(pr_data, rag_context), ) # Merge results from all agents findings = security_findings + performance_findings + style_findings ``` **The graceful degradation part:** Each agent handles its own exceptions internally (in `BaseAgent.review()`). If one agent fails — Groq times out, the model returns invalid JSON, the static tool crashes — it catches the exception and returns `[]` (empty list). This means `asyncio.gather()` NEVER sees an exception. All three calls always "succeed." ``` What happens if Performance Agent's Groq call times out: asyncio.gather( security_agent.review(...) --> returns [finding1, finding2] OK performance_agent.review(...) --> catches exception, returns [] FAILED GRACEFULLY style_agent.review(...) --> returns [finding3, finding4] OK ) # Result: [finding1, finding2] + [] + [finding3, finding4] # = 4 findings from 2 agents # Better than crashing the entire pipeline! ``` **Why not use `asyncio.gather(return_exceptions=True)`?** That would return the Exception object in the results list instead of raising it. But we don't need it — our agents already handle exceptions internally. Using `return_exceptions=True` would complicate the calling code (need to check if each result is a list or an Exception) for no benefit. **Interview talking point:** "We run all three agents concurrently using `asyncio.gather()`, which reduces total latency from the sum of agent times to the maximum — a 2.6x speedup in practice. This works because each agent is I/O-bound (waiting for the Groq API), not CPU-bound, so asyncio's cooperative multitasking uses the idle wait time to service other agents. Each agent handles exceptions internally, so a single agent failure doesn't crash the pipeline — the remaining agents' findings are still posted." --- ## Concept 7: Integration into base_agent.py — The rag_context Parameter ### What Changed The `review()` method in `BaseAgent` was updated to accept an optional `rag_context` parameter: ```python # BEFORE (Week 3): async def review(self, pr_data: PRData) -> list[Finding]: # AFTER (Week 6): async def review(self, pr_data: PRData, rag_context: str = "") -> list[Finding]: # ^^^^^^^^^^^^^^^^^^^^ # New parameter with empty default ``` ### How RAG Context Reaches the LLM The prompt template was updated to include a `{rag_context}` placeholder: ```python def _build_prompt(self) -> ChatPromptTemplate: return ChatPromptTemplate.from_messages([ ("system", self.system_prompt), ("human", ( "## PR Diff\n" "```diff\n{diff}\n```\n\n" "## Changed File Contents\n" "{file_contents}\n\n" "## Static Analysis Results\n" "{static_analysis}\n\n" "{rag_context}\n\n" # <-- RAG context injected here "Analyze this PR and return your findings as structured JSON." )), ]) ``` And in the `review()` method, the context is passed through: ```python result = await chain.ainvoke({ "diff": pr_data.diff[:15000], "file_contents": self._format_file_contents(pr_data.file_contents), "static_analysis": static_results or "No static analysis results.", "rag_context": rag_context or "", # <-- Injected here }) ``` ### Why rag_context Defaults to Empty String This design decision embodies the **graceful degradation** principle: 1. **If RAG fails** (model not loaded, ChromaDB error, no relevant chunks found) — the agents still work, they just have less context. The LLM prompt simply has an empty string where the RAG context would be. 2. **In tests** — we don't need to mock the entire RAG pipeline. Just call `agent.review(pr_data)` without the second argument. 3. **Backward compatibility** — existing code that calls `review(pr_data)` without `rag_context` continues to work without modification. This follows a pattern sometimes called **"fail-open"** in security contexts: RAG is an enhancement, not a requirement. Reviews still work without it — they're just less informed. --- ## Concept 8: Integration into main.py — The Full Updated Pipeline ### The Complete Flow Here is how everything connects in the `_process_pr_review` function: ```python async def _process_pr_review(repo_full_name, pr_number, commit_sha, installation_id): """Background task: fetch PR data and post a review.""" # --- Step 1: Fetch PR data (Week 2) --- client = GitHubClient(installation_id) pr_data = await client.fetch_pr_data(repo_full_name, pr_number) # --- Step 2: RAG — Index files into ChromaDB (Week 6 NEW) --- rag_context = "" try: collection_name = await index_repo_files( repo_full_name, pr_data.file_contents ) # --- Step 3: RAG — Retrieve relevant context (Week 6 NEW) --- rag_context = await retrieve_context( collection_name, pr_data.diff[:5000] ) except Exception as rag_err: logger.warning("RAG context unavailable", error=str(rag_err)) # Continue without RAG — fail-open pattern # --- Step 4: Run 3 agents in parallel (Week 6 NEW) --- security_agent = SecurityAgent() performance_agent = PerformanceAgent() style_agent = StyleAgent() security_findings, performance_findings, style_findings = await asyncio.gather( security_agent.review(pr_data, rag_context), performance_agent.review(pr_data, rag_context), style_agent.review(pr_data, rag_context), ) # --- Step 5: Merge findings and compute health score --- findings = security_findings + performance_findings + style_findings # ... health score calculation, SynthesizedReview construction ... # --- Step 6: Post to GitHub --- # ... inline comments with fallback to summary comment ... # --- Step 7: Cache in Redis --- await mark_as_reviewed(commit_sha) ``` ### What Changed from Previous Weeks | Component | Before Week 6 | After Week 6 | |-----------|--------------|-------------| | RAG | Not present | Index files --> embed --> store --> query --> retrieve | | Agent execution | Sequential: `findings = await security_agent.review(pr_data)` | Parallel: `asyncio.gather(agent1.review(...), agent2.review(...), agent3.review(...))` | | Agent review() | `review(pr_data)` — one argument | `review(pr_data, rag_context)` — two arguments | | LLM prompt | diff + files + static analysis | diff + files + static analysis + RAG context | | Error handling | Agent-level only | Agent-level + RAG-level (try/except around RAG pipeline) | ### The try/except Around the RAG Pipeline Notice that the entire RAG block (index + retrieve) is wrapped in a try/except: ```python rag_context = "" try: collection_name = await index_repo_files(...) rag_context = await retrieve_context(...) except Exception as rag_err: logger.warning("RAG context unavailable", error=str(rag_err)) ``` This means if ANYTHING goes wrong with RAG — sentence-transformers not installed, ChromaDB crashes, embedding model returns garbage — the pipeline continues with `rag_context = ""`. The agents receive an empty string for RAG context and proceed with diff + files + static analysis only. This is the fail-open pattern applied at the pipeline level. --- ## Code Walkthroughs ### embedder.py — The Embedding Pipeline **File:** `app/context/embedder.py` This file has three responsibilities: 1. Lazy-load the sentence-transformers model 2. Convert text to embeddings 3. Chunk source code into embeddable pieces ```python # Lazy-loaded model to avoid slow import at startup _model = None def get_embedding_model(): """ Lazy-load the sentence-transformers model. We load on first use (not at import time) because: 1. The model takes ~2 seconds to load from cache (~56s cold download) 2. Not every request needs embeddings (cached reviews skip this) 3. Tests shouldn't load a real ML model — they mock embed_texts() """ global _model if _model is None: try: from sentence_transformers import SentenceTransformer # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ # Import is INSIDE the function, not at module top. # This means importing embedder.py is instant. # The heavy SentenceTransformer import only happens # when someone actually calls get_embedding_model(). _model = SentenceTransformer(settings.embedding_model) # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ # settings.embedding_model = "all-MiniLM-L6-v2" # On first call, this downloads ~90MB from HuggingFace. # Subsequent calls use the cached model (~2s load). except ImportError: logger.warning("sentence-transformers not installed -- RAG disabled") return None return _model ``` ```python def embed_texts(texts: list[str]) -> list[list[float]]: """Convert text strings to 384-dimensional vectors.""" model = get_embedding_model() if model is None: return [] # Graceful degradation if model unavailable embeddings = model.encode(texts, show_progress_bar=False) # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ # Batch encoding: more efficient than encoding one at a time # because the model can process multiple inputs in a single # forward pass through the neural network. return embeddings.tolist() # ^^^^^^^^^^^^^^^^^^^ # Convert from NumPy array to Python list (ChromaDB expects lists) ``` ### indexer.py — The ChromaDB Indexer **File:** `app/context/indexer.py` ```python async def index_repo_files(repo_full_name, file_contents): client = _get_chroma_client() # Singleton ChromaDB client collection_name = _collection_name(repo_full_name) # Sanitize name # Get or create a collection for THIS repo (isolation between repos) collection = client.get_or_create_collection( name=collection_name, metadata={"repo": repo_full_name}, ) # Chunk all files, skipping files > 100KB (likely binary/generated) all_chunks = [] for filepath, content in file_contents.items(): if len(content) > 100_000: continue # Skip huge files chunks = chunk_code(content, filepath) all_chunks.extend(chunks) # Safety: cap total chunks to avoid OOM on Render's 512MB RAM max_chunks = settings.max_repo_files_index # Default: 500 if len(all_chunks) > max_chunks: all_chunks = all_chunks[:max_chunks] # Batch embed all chunks (one call to the model) texts = [chunk["text"] for chunk in all_chunks] embeddings = embed_texts(texts) # Upsert: insert or update (idempotent for re-indexing) ids = [f"{chunk['filepath']}:{chunk['start_line']}" for chunk in all_chunks] collection.upsert(ids=ids, embeddings=embeddings, documents=texts, metadatas=metadatas) return collection_name # Passed to retriever for querying ``` ### retriever.py — The RAG Retriever **File:** `app/context/retriever.py` ```python async def retrieve_context(collection_name, query_text, top_k=5): try: client = _get_chroma_client() # If collection doesn't exist, there's nothing to retrieve try: collection = client.get_collection(name=collection_name) except Exception: return "" # No index yet -- proceed without RAG if collection.count() == 0: return "" # Empty collection -- nothing to search # Embed the query using the SAME model used for indexing # (critical: mismatched models would produce incompatible vectors) query_embeddings = embed_texts([query_text[:5000]]) # Nearest neighbor search results = collection.query( query_embeddings=query_embeddings, n_results=min(top_k, collection.count()), include=["documents", "metadatas", "distances"], ) # Format results, filtering by relevance context_parts = ["## Related Code Context (from repository)\n"] for doc, metadata, distance in zip( results["documents"][0], results["metadatas"][0], results["distances"][0], ): similarity = max(0, 1 - distance / 2) # L2 --> 0-1 similarity if similarity < 0.3: continue # Skip irrelevant results context_parts.append( f"### {filepath} (lines {start}-{end}, relevance: {similarity:.0%})\n" f"```\n{doc}\n```\n" ) if len(context_parts) == 1: # Only the header, no actual results return "" return "\n".join(context_parts) except Exception as e: logger.warning("RAG retrieval failed", error=str(e)) return "" # Fail-open: agents work without RAG ``` --- ## Live Test Results: PR #4 ### RAG in Action ``` Webhook received -- PR #4, sha=a1b2c3d4 [Step 1] Fetched PR data: 1 file, 1 with content [Step 2] Chunking: 1 file --> 1 chunk (file was < 60 lines) [Step 3] Embedding: 1 chunk --> [0.23, -0.15, 0.87, ...] (384 dims) [Step 4] ChromaDB upsert: 1 chunk stored in collection "repo_ninjacode911_codeguard_test" [Step 5] Query: embedded PR diff, searched ChromaDB [Step 6] Retrieved: 1 relevant chunk (relevance: 72%) [Step 7] Injected RAG context into all 3 agent prompts [Step 8] asyncio.gather: 3 agents started concurrently [Step 9] All agents completed in ~7 seconds (after model load) ``` ### The Cold Start Problem First PR review after deployment: ``` [00.0s] Webhook received [56.2s] sentence-transformers model downloaded from HuggingFace (COLD START) [56.8s] Model loaded, embedding started [57.0s] Indexing complete (1 chunk) [57.2s] Retrieval complete (1 chunk returned) [64.0s] All 3 agents completed [64.5s] Posted to GitHub Total: ~64 seconds (56s model download + 8s actual work) ``` Second PR review (model cached): ``` [00.0s] Webhook received [02.0s] Model loaded from cache [02.2s] Indexing complete [02.4s] Retrieval complete [09.0s] All 3 agents completed [09.5s] Posted to GitHub Total: ~9 seconds (2s model load + 7s actual work) ``` The 56-second cold start is addressed by the pre-warm cron job from Week 1, which hits the `/health` endpoint periodically to keep the service warm. In a future iteration, we could trigger model pre-loading on the `/health` endpoint itself. --- ## Bugs Encountered and Fixed ### Bug 1: sentence-transformers Cold Start (~56 seconds) **Symptom:** First PR review after deployment took 70+ seconds instead of ~9 seconds. **Cause:** `SentenceTransformer("all-MiniLM-L6-v2")` downloads the model from HuggingFace Hub on first use (~56 seconds on Render's network). Subsequent loads use the local cache (~2 seconds). **Fix:** Lazy loading pattern — the model is only loaded when `embed_texts()` is first called, not at import time. Combined with the pre-warm cron (Week 1), the first real PR review always hits a warm model cache. ```python _model = None def get_embedding_model(): global _model if _model is None: from sentence_transformers import SentenceTransformer _model = SentenceTransformer(settings.embedding_model) return _model ``` **Why not pre-load at server startup?** Because the server needs to respond to Render's health check within seconds of starting. If we blocked startup for 56 seconds, Render would think the service crashed and kill it. ### Bug 2: ChromaDB Collection Name Validation **Symptom:** `ValueError` when creating a ChromaDB collection. **Cause:** ChromaDB collection names must be 3-63 characters, containing only alphanumeric characters and underscores. GitHub repo names like `ninjacode911/code-guard-test` contain slashes and hyphens — both rejected by ChromaDB with an opaque error message. **Fix:** The `_collection_name()` sanitizer replaces invalid characters: ```python def _collection_name(repo_full_name: str) -> str: name = repo_full_name.replace("/", "_").replace("-", "_") return f"repo_{name}"[:63] ``` **Lesson:** Always validate inputs at system boundaries. ChromaDB's error message was `"Expected collection name to match..."` without specifying which characters were invalid. --- ## Tests Written (Week 6) ### test_rag_pipeline.py — 10 Tests | Test | What It Verifies | |------|-----------------| | `test_small_file_single_chunk` | File < 60 lines produces exactly 1 chunk | | `test_large_file_multiple_chunks` | 150-line file produces 2+ overlapping chunks | | `test_chunk_includes_filepath_in_text` | `# File: src/utils/helper.py` appears in chunk text | | `test_skips_nearly_empty_chunks` | Chunks with < 5 non-empty lines are filtered out | | `test_chunk_metadata_has_line_numbers` | start_line=1, end_line=30, overlap starts at 21 | | `test_converts_repo_name_to_valid_collection` | Slashes and hyphens replaced, `repo_` prefix | | `test_truncates_long_names` | Collection names capped at 63 characters | | `test_index_repo_files_returns_collection_name` | Indexing returns valid collection name | | `test_index_handles_empty_files` | Empty file dict does not crash | | `test_index_skips_large_files` | Files > 100KB excluded from embedding | ### test_parallel_agents.py — 6 Tests | Test | What It Verifies | |------|-----------------| | `test_all_agents_have_unique_names` | `{"security", "performance", "style"}` are distinct | | `test_all_agents_load_prompts` | All 3 prompts load without filesystem errors | | `test_prompts_are_domain_specific` | Security has "CWE", Performance has "N+1", Style has "naming" | | `test_prompts_have_scope_boundaries` | Each prompt says "do not comment on" other domains | | `test_gather_runs_concurrently` | 3 x 0.1s tasks complete in < 0.25s (not 0.3s) | | `test_gather_handles_partial_failure` | One failing task returns `[]`, others return results | ### Total: 16 New Tests Across 2 Files **Test design decisions:** - **Embeddings are mocked** — We mock `embed_texts()` to return `[[0.1] * 384]` instead of loading the real model. Without mocking, every test run would wait 2-56 seconds for model loading, making the test suite impractically slow. - **ChromaDB is NOT mocked** — We use the real in-memory ChromaDB client in tests. It's fast (milliseconds), deterministic, and requires no setup. Mocking it would hide integration issues between our code and ChromaDB's API. - **Parallel execution is tested with asyncio.sleep()** — We verify that `asyncio.gather()` runs tasks concurrently by timing them: three 0.1-second sleeps should complete in ~0.1s (parallel) not ~0.3s (sequential). --- ## Files Created/Modified in Week 6 | File | Type | Purpose | |------|------|---------| | `app/context/embedder.py` | **New** | Embedding pipeline: lazy model loading, embed_texts(), chunk_code() | | `app/context/indexer.py` | **New** | ChromaDB indexer: collection-per-repo, upsert semantics, chunk limits | | `app/context/retriever.py` | **New** | RAG retriever: similarity search, threshold filtering, context formatting | | `app/agents/base_agent.py` | **Modified** | Added `rag_context` parameter to `review()` and `{rag_context}` to prompt template | | `app/main.py` | **Modified** | Added RAG pipeline (index + retrieve) and `asyncio.gather()` for 3 parallel agents | | `tests/unit/test_rag_pipeline.py` | **New** | 10 tests for chunking, indexing, retrieval | | `tests/unit/test_parallel_agents.py` | **New** | 6 tests for agent identity and concurrent execution | --- ## Dependencies Added | Package | Purpose | |---------|---------| | `sentence-transformers>=3.3.0` | Local embedding model (all-MiniLM-L6-v2, 22M params, 384 dims) | | `chromadb>=0.5.0` | In-memory vector database for storing and searching embeddings | --- ## Architecture Patterns Used (Interview Reference) | Pattern | Where Used | What It Means | Why It Matters | |---------|------------|---------------|----------------| | **RAG (Retrieval-Augmented Generation)** | embedder + indexer + retriever | External knowledge injected into LLM prompt | Agents see related code beyond the diff, reducing false positives | | **Lazy Loading** | embedder.py (`_model = None`) | Resource initialized on first use, not at import time | Avoids 56-second cold-start penalty on every import | | **Singleton** | embedder.py, indexer.py (`_chroma_client`) | Module-level global ensures exactly one instance | One embedding model, one ChromaDB client — no redundant memory | | **Fail-Open** | retriever.py, main.py | If RAG fails, agents proceed without context | RAG is an enhancement, not a requirement — reviews still work without it | | **Concurrent Execution** | main.py (`asyncio.gather()`) | Multiple I/O-bound tasks run on one thread cooperatively | 2.6x latency reduction (5s instead of 15s) | | **Graceful Degradation** | base_agent.py (`return []` on error) | Failures return empty results instead of crashing | One agent failing doesn't kill the other agents' findings | | **Upsert Semantics** | indexer.py (`collection.upsert()`) | Insert-or-update prevents duplicate entries | Re-indexing same file on re-review is idempotent | | **Input Sanitization** | indexer.py (`_collection_name()`) | Clean external input before passing to storage | GitHub repo names contain characters ChromaDB rejects | | **Overlap Chunking** | embedder.py (10-line overlap) | Adjacent chunks share boundary lines | Functions spanning chunk boundaries remain complete in at least one chunk | --- ## Key Interview Talking Points Summary 1. **RAG for Code Review:** "RAG gives our agents 'peripheral vision' beyond the diff. When reviewing a database query change, RAG retrieves the DB wrapper class, validation middleware, and similar patterns from across the repository. We use sentence-transformers for local embeddings (no API cost, ~10ms per chunk) and ChromaDB as an embedded vector store (no infrastructure)." 2. **Embeddings:** "We use all-MiniLM-L6-v2 — a 22-million parameter model that produces 384-dimensional vectors. It runs on CPU in ~10ms per chunk, which is fast enough for real-time indexing during webhook processing. Unlike keyword search, embeddings capture semantic meaning — a query about 'database connection' matches code containing `sqlite3.connect()` even though the words are different." 3. **Chunking Strategy:** "We chunk code into 60-line blocks with 10-line overlap. Sixty lines is roughly one function — the natural semantic unit of code. The overlap ensures that functions spanning chunk boundaries are complete in at least one chunk. We skip near-empty chunks to avoid polluting the vector store." 4. **ChromaDB Choice:** "ChromaDB runs embedded in the Python process — zero infrastructure. We accept the trade-off of in-memory storage because Render's free tier has ephemeral disk, and rebuilding the index takes under 2 seconds for typical PRs. Each repo gets its own collection for isolation, and upsert semantics make re-indexing idempotent." 5. **Parallel Execution:** "We run all three agents concurrently with asyncio.gather(). Since each agent is I/O-bound (waiting for the Groq API), asyncio's cooperative multitasking overlaps the wait times. Total latency is max(agent times) not sum — a 2.6x speedup. Each agent handles exceptions internally, so one failure doesn't crash the others." 6. **Fail-Open Design:** "Every component in the RAG pipeline can fail without crashing the system. If the embedding model fails to load, agents work without RAG context. If ChromaDB throws an error, the try/except in main.py catches it and continues. If one agent's LLM call times out, the other two agents' findings are still posted. We always prefer partial results over total failure." --- ## Cumulative Test Count | Week | New Tests | Cumulative Total | |------|-----------|-----------------| | Week 1 | 8 (schema validation) | 8 | | Week 2 | 12 (webhook + cache) | 20 | | Week 3 | 15 (security agent + tools + formatter) | 35 | | Week 4 | 8 (performance agent + radon) | 43 | | Week 5 | 9 (style agent + ruff) | 52 | | **Week 6** | **16 (RAG pipeline + parallel agents)** | **68** | --- *Documentation written 2026-03-20 as part of Week 6 completion.*