ninja-code-guard / docs /WEEK6_RAG_AND_PARALLEL.md
NinjainPJs's picture
initial - commit
4b445f6

Week 6: RAG Pipeline & Parallel Agent Execution β€” Detailed Documentation

Goal: Give agents "peripheral vision" via RAG (Retrieval-Augmented Generation) and run all three agents concurrently with asyncio.gather(). Status: Complete β€” Live-tested on PR #4 with RAG context and 3 parallel agents Date: 2026-03-20 Test PR: github.com/ninjacode911/codeguard-test/pull/4 Result: RAG indexed 1 chunk, retrieved context, 3 agents ran in parallel in ~7 seconds (after model load)


What We Built

Week 6 adds two capabilities that transform Ninja Code Guard from a "look at the diff and guess" system into one that understands the surrounding codebase and runs efficiently at scale.

  1. RAG Pipeline β€” Embeds repository source code into a vector database (ChromaDB), then retrieves semantically relevant code chunks and injects them into each agent's LLM prompt. This gives agents evidence about code they can't see in the diff alone.

  2. Parallel Agent Execution β€” All three domain agents (Security, Performance, Style) now run concurrently via asyncio.gather(), reducing total review latency from the SUM of agent times to the MAX of agent times.

                         PR Webhook Received
                                |
                                v
                    +--------------------------+
                    |   1. Fetch PR Data       |  GitHub API: diff + file contents
                    |      (Week 2)            |
                    +------------+-------------+
                                 |
                                 v
                    +--------------------------+
                    |   2. RAG: Index Files    |  NEW in Week 6
                    |   chunk --> embed -->    |  sentence-transformers
                    |   store in ChromaDB     |  all-MiniLM-L6-v2 (384 dims)
                    +------------+-------------+
                                 |
                                 v
                    +--------------------------+
                    |   3. RAG: Retrieve       |  NEW in Week 6
                    |   embed query --> search |  top-K nearest neighbors
                    |   --> filter by 0.3      |  L2 distance to similarity
                    +------------+-------------+
                                 | rag_context string
                                 v
            +--------------------+--------------------+
            |                    |                    |
            v                    v                    v
   +------------------+ +------------------+ +------------------+
   | Security Agent   | | Performance      | | Style Agent      |
   | (Bandit +        | | Agent (Radon)    | | (Ruff)           |
   | detect-secrets)  | |                  | |                  |
   |                  | |                  | |                  |
   |   rag_context    | |   rag_context    | |   rag_context    |
   |   injected into  | |   injected into  | |   injected into  |
   |   prompt         | |   prompt         | |   prompt         |
   +--------+---------+ +--------+---------+ +--------+---------+
            |                    |                    |
            |   asyncio.gather() -- all 3 run concurrently
            |                    |                    |
            v                    v                    v
   +----------------------------------------------------------+
   |                 Merge Findings                             |
   |  security_findings + performance_findings + style_findings |
   |  Health Score = 100 - (critical*25) - (high*10) - ...      |
   +----------------------------+-------------------------------+
                                |
                                v
                    +--------------------------+
                    |   Post to GitHub         |  Inline comments + summary
                    |   Cache in Redis         |
                    +--------------------------+

Concept 1: What is RAG (Retrieval-Augmented Generation)?

The Problem: Diffs Are Not Enough

When a developer opens a PR, the diff shows what CHANGED. But understanding whether a change is correct, safe, or performant often requires seeing code that DIDN'T change:

The PR adds this line:
    + result = db.execute(query, params)

Questions the agent should ask:
    1. What is db.execute()? Is it an ORM that parameterizes inputs, or raw SQL?
       --> Need to see the DB wrapper class (in another file)
    2. Where does `query` come from? Is it user-controlled?
       --> Need to see the caller functions (in other files)
    3. Are there other places in the codebase doing the same thing?
       --> Need semantic search across the entire repo
    4. Is there middleware that validates the input before it reaches here?
       --> Need to see the request handling pipeline

Without RAG, the agent has to GUESS the answers to these questions. With RAG, the agent has EVIDENCE β€” actual code from the repository that it can reason about.

The RAG Pipeline: Step by Step

RAG has two phases: indexing (prepare the knowledge base) and retrieval (query it).

+------------------------------------------------------------------+
|                   INDEXING PHASE (once per PR review)              |
|                                                                    |
|  Source Files ---> chunk_code() ---> embed_texts() ---> ChromaDB  |
|  (from GitHub)    60-line chunks    sentence-transformers upsert   |
|                   10-line overlap   all-MiniLM-L6-v2              |
|                                     384-dimensional vectors        |
+------------------------------------------------------------------+

+------------------------------------------------------------------+
|                  RETRIEVAL PHASE (once per PR review)              |
|                                                                    |
|  PR Diff ----> embed_texts() ----> ChromaDB query ----> Top-K     |
|  (query)       same model          nearest neighbor     formatted  |
|                                    similarity search    as context |
|                                                         for LLM   |
+------------------------------------------------------------------+

In plain English: We take all the files in the PR, chop them into small pieces, convert each piece into a list of numbers (a "vector") that captures its meaning, and store those vectors in a database. Then we take the PR diff, convert IT into a vector, and ask the database: "which code pieces are most similar to this diff?" The database returns the most relevant pieces, which we paste into the LLM's prompt alongside the diff.

Interview talking point: "RAG gives our agents 'peripheral vision' β€” they see not just the changed lines, but semantically related code from across the repository. When a PR modifies a database query, RAG retrieves the DB wrapper class, validation middleware, and similar query patterns from other files. This dramatically reduces false positives because the agent can verify whether input is already sanitized elsewhere, rather than guessing."


Concept 2: Embeddings β€” Turning Code Into Numbers

What Is an Embedding?

An embedding is a fixed-size list of numbers (a "vector") that captures the MEANING of a piece of text. Two pieces of text with similar meaning will have vectors that are close together in vector space, even if they use completely different words.

"connect to database"    -->  [0.23, -0.15, 0.87, 0.04, ...]   --+
                                                                   +-- Close together
"establish DB connection" -->  [0.21, -0.18, 0.85, 0.06, ...]   --+
                                                                       (high similarity)
"print hello world"       -->  [-0.45, 0.72, -0.12, 0.33, ...]  --- Far away
                                                                       (low similarity)

How this differs from keyword search: A keyword search for "database connection" would NOT match a code chunk containing conn = sqlite3.connect("users.db") β€” the words don't match. But embedding similarity WOULD match them, because the model understands that sqlite3.connect is semantically related to "database connection."

Why all-MiniLM-L6-v2?

We chose the all-MiniLM-L6-v2 model from the sentence-transformers library. Here is why:

Property Value Why It Matters
Parameters 22M Small enough to run on CPU in production (Render free tier has no GPU)
Dimensions 384 Good balance: enough dimensions to capture nuance, small enough for fast search. 768 or 1536 dims would be more precise but use more memory and slower retrieval
Speed ~10ms/chunk on CPU Fast enough for real-time indexing during webhook processing. At 200 chunks, that's 2 seconds total
Training data Semantic textual similarity Optimized for "do these texts mean the same thing?" β€” exactly what we need for finding related code
Cost Free, runs locally No API calls, no rate limits, no vendor lock-in. Runs entirely in our Render process
Download size ~90 MB Small enough that even cold-start download is manageable (though it takes ~56 seconds β€” see Bug section)

Why not OpenAI's text-embedding-3-small or Cohere? Those are arguably better at natural language, but they cost money per API call and add network latency. For code similarity β€” where the signal is in structure, function names, and identifiers rather than prose β€” MiniLM is good enough. The speed and cost advantage of running locally is significant when you're embedding 200 chunks per PR review.

Shannon Entropy vs. Semantic Similarity

These are two different ways to measure "interestingness" of a string:

Shannon entropy (used by detect-secrets in Week 3) measures RANDOMNESS:

  • "hello" has entropy ~2.8 bits/char β€” predictable, not a secret
  • "a3f8Kx9m2Q" has entropy ~3.9 bits/char β€” random, probably a secret
  • It answers: "How unpredictable is this string?" β€” useful for finding API keys

Semantic similarity (used by embeddings in Week 6) measures MEANING:

  • "connect to database" and "establish DB connection" have high similarity
  • It answers: "Do these texts mean the same thing?" β€” useful for finding related code

They solve completely different problems. Entropy is a statistical measure of randomness. Similarity is a learned measure of semantic relatedness.

Interview talking point: "We use Shannon entropy in detect-secrets to find API keys (high-entropy strings are likely secrets) and semantic embeddings in RAG to find related code (semantically similar chunks are likely relevant context). These are complementary techniques β€” entropy operates on individual strings, embeddings operate on meaning across entire code blocks."


Concept 3: Code Chunking Strategy

Why We Chunk

The embedding model has a maximum input length (~256 tokens for MiniLM), and even within that limit, shorter inputs produce better embeddings. A 500-line file would produce a diluted embedding that weakly matches many topics. A 60-line function produces a focused embedding that strongly matches its specific topic.

The chunk_code() Function β€” Walkthrough

def chunk_code(content: str, filepath: str, chunk_size: int = 60) -> list[dict]:
    """
    Split source code into overlapping chunks for embedding.
    """
    lines = content.split("\n")
    chunks = []
    overlap = 10           # Lines shared between adjacent chunks
    start = 0

    while start < len(lines):
        end = min(start + chunk_size, len(lines))
        chunk_text = "\n".join(lines[start:end])

        # Skip very small chunks (less than 5 non-empty lines)
        #   WHY: A chunk of blank lines and comments has no semantic
        #   content worth embedding. It would waste storage and produce
        #   misleading similarity matches.
        non_empty = sum(1 for line in lines[start:end] if line.strip())
        if non_empty >= 5:
            chunks.append({
                "text": f"# File: {filepath}\n{chunk_text}",
                #         ^^^^^^^^^^^^^^^^^
                #         Filepath prepended so the embedding model
                #         "sees" the file path as part of the content.
                #         A query about "database" will match chunks in
                #         db/connection.py partly because of the filepath.
                "filepath": filepath,
                "start_line": start + 1,    # 1-indexed for human readability
                "end_line": end,
            })

        start += chunk_size - overlap   # Move forward, but keep 10 lines of overlap
    return chunks

Why 60 Lines Per Chunk?

This is the Goldilocks zone for code:

Too small (10 lines):
    def get_user(user_id):        <-- Just the signature
        conn = sqlite3.connect(   <-- No context about what happens next
    ...
    PROBLEM: Loses context. A function signature without its body is useless
    for understanding behavior.

Too large (200 lines):
    class UserService:            <-- Database logic
        def get_user(...): ...
        def update_user(...): ... <-- Authentication logic
        def delete_user(...): ... <-- Logging logic
        def validate(...): ...
    ...
    PROBLEM: Dilutes the embedding signal. A 200-line chunk about
    "database queries AND logging AND error handling" will weakly match
    all three topics instead of strongly matching one.

Just right (60 lines = ~one function/class):
    def get_user(user_id):
        conn = sqlite3.connect("users.db")
        query = "SELECT * FROM users WHERE id = ?"
        return conn.execute(query, (user_id,)).fetchone()
    ...
    GOOD: Captures a single concept well. The embedding strongly represents
    "database query for user lookup" and will match queries about DB access.

Why 10 Lines of Overlap?

Without overlap, a function that spans lines 55-70 would be split across two chunks:

Without overlap:                 With 10-line overlap:
  Chunk 1: lines 1-60             Chunk 1: lines 1-60
  Chunk 2: lines 61-120           Chunk 2: lines 51-110
                                             ^^^^^^^^
                                             overlap zone (lines 51-60)

  Function at lines 55-70:        Function at lines 55-70:
    Chunk 1 has lines 55-60         Chunk 1 has lines 55-60 (partial)
    Chunk 2 has lines 61-70         Chunk 2 has lines 51-70 (COMPLETE!)
    NEITHER chunk has the           Chunk 2 has the full function.
    complete function!

The trade-off: Overlap means ~17% more chunks (and therefore ~17% more embedding computation and storage). For a 200-chunk file, that is 34 extra chunks β€” a worthwhile trade for context integrity.

Why Skip Chunks with <5 Non-Empty Lines?

A chunk that is mostly blank lines, comments, or whitespace has no meaningful semantic content. Embedding it would:

  1. Waste ChromaDB storage space
  2. Produce misleading similarity matches (blank chunks might match other blank chunks)
  3. Add noise to the retrieval results

The threshold of 5 is deliberately conservative β€” even a short function like def add(a, b): return a + b with some surrounding context will pass.

Interview talking point: "Our chunking strategy uses 60-line windows with 10-line overlap, tuned for the natural granularity of source code β€” roughly one function or class per chunk. The overlap ensures functions spanning chunk boundaries remain complete in at least one chunk. We skip near-empty chunks to avoid polluting the vector store with semantically meaningless content. The filepath is prepended to each chunk so the embedding model can use it as a semantic signal β€” queries about 'database' will naturally match chunks from files in the db/ directory."


Concept 4: ChromaDB β€” Embedded Vector Database

What ChromaDB Is

ChromaDB is an open-source vector database that stores embeddings alongside the original documents and metadata. Unlike Postgres or Redis (which store rows or key-value pairs), ChromaDB is optimized for similarity search β€” "find the 5 stored items most similar to this query."

The key differentiator: ChromaDB runs embedded in the Python process. No separate server, no Docker container, no network calls, no infrastructure to manage. You pip install chromadb and call chromadb.Client().

Why In-Memory Mode?

We use chromadb.Client() (in-memory, no persistence) instead of chromadb.PersistentClient(path="./data") because Render's free tier has ephemeral storage β€” files on disk are lost whenever the service restarts.

This means the vector index is rebuilt on every PR review. Is that acceptable?

Indexing cost per PR review:
    Typical PR: 5-20 changed files, 50-200 code chunks
    Embedding time: ~10ms per chunk x 200 chunks = ~2 seconds
    ChromaDB upsert time: ~100ms total
    Total indexing overhead: ~2 seconds

    Verdict: Acceptable. The LLM calls take 3-7 seconds each.
    2 seconds of indexing is a small fraction of total review time.

In a production system with persistent storage (paid Render tier, AWS ECS, etc.), you would use PersistentClient so the index survives restarts and only needs incremental updates.

Collection-Per-Repo Pattern

Each GitHub repository gets its own ChromaDB collection. This provides natural isolation β€” code from repo-A doesn't contaminate retrieval results for repo-B.

def _collection_name(repo_full_name: str) -> str:
    """Generate a valid ChromaDB collection name from a repo name.

    ChromaDB collection names must be:
    - 3-63 characters long
    - Alphanumeric + underscores only (no slashes, no hyphens)

    GitHub repo names like "ninjacode911/code-guard-test" violate both rules.
    """
    # "ninjacode911/code-guard-test" --> "repo_ninjacode911_code_guard_test"
    name = repo_full_name.replace("/", "_").replace("-", "_")
    return f"repo_{name}"[:63]   # Enforce max length with slice

This sanitizer was born from Bug #3 (see Bugs section) β€” ChromaDB silently rejected invalid names with an opaque error message that took an hour to debug.

Upsert for Idempotent Indexing

We use collection.upsert() instead of collection.add(). The difference:

Operation If ID exists If ID doesn't exist
add() Raises an error (duplicate) Inserts new document
upsert() Updates the existing document Inserts new document

Why this matters: When a developer pushes a fix to the same PR, we re-review it. The same files get indexed again. With upsert, re-indexing just overwrites the old vectors instead of creating duplicates or crashing.

The ID format filepath:start_line (e.g., "app.py:1", "app.py:51") ensures each chunk position is unique within a collection.

# Upsert into ChromaDB
ids = [f"{chunk['filepath']}:{chunk['start_line']}" for chunk in all_chunks]
# Examples: ["app.py:1", "app.py:51", "utils.py:1", "utils.py:51"]

collection.upsert(
    ids=ids,                  # Unique ID per chunk
    embeddings=embeddings,    # 384-dimensional vectors
    documents=texts,          # Original code text (for returning in results)
    metadatas=metadatas,      # filepath, start_line, end_line (for display)
)

Why ChromaDB Over Alternatives?

Vector DB Pros Cons Our Choice
ChromaDB Embedded (no server), Python-native, simple API Limited scale (~1M vectors) Yes β€” simplicity wins for MVP
Pinecone Managed, scalable, fast Requires API key, costs money, vendor lock-in No
pgvector Uses existing Postgres Requires DB setup, slower queries Maybe later for production
FAISS Facebook's library, very fast No metadata storage, manual management No β€” too low-level
Weaviate Full-featured, GraphQL API Heavy, requires Docker or cloud No β€” overkill

Interview talking point: "We use ChromaDB in embedded mode β€” it runs inside the Python process with zero infrastructure. The trade-off is in-memory only storage on Render's free tier, so we rebuild the index on each review. This is acceptable because indexing 10-20 files takes under 2 seconds. Each repo gets its own collection identified by a sanitized version of the GitHub repo name, and we use upsert semantics to handle re-indexing gracefully without duplicates."


Concept 5: Retrieval β€” Finding Relevant Code

How Similarity Search Works

When we embed the PR diff and query ChromaDB, the database performs approximate nearest neighbor (ANN) search. In simplified terms:

Step 1: Embed the query (PR diff)
    "def get_user(user_id):\n    query = f'SELECT...'"
         |
         v
    embed_texts() --> [0.34, -0.21, 0.76, ...]   (384 numbers)

Step 2: Compare against all stored vectors
    Stored chunk 1 (db/connection.py):     distance = 0.42  (close!)
    Stored chunk 2 (auth/middleware.py):    distance = 0.87  (somewhat close)
    Stored chunk 3 (utils/logging.py):     distance = 1.95  (far away)
    Stored chunk 4 (db/models.py):         distance = 0.55  (close)
    Stored chunk 5 (tests/test_app.py):    distance = 2.31  (very far)

Step 3: Return top-K by distance (K=5)
    Result: [chunk1, chunk2, chunk4, ...]  sorted by relevance

L2 Distance to Similarity Conversion

ChromaDB uses L2 (Euclidean) distance by default. Lower distance = more similar. But humans think in terms of "similarity" (higher = more similar), so we convert:

# ChromaDB returns L2 distance -- lower = more similar
# Convert to 0-1 similarity score -- higher = more similar
similarity = max(0, 1 - distance / 2)

Why distance / 2? For normalized embeddings (which MiniLM produces), L2 distance ranges from 0 (identical) to 2 (maximally different). Dividing by 2 normalizes to 0-1, then subtracting from 1 inverts the scale so 1 = identical and 0 = unrelated.

Why Filter by Similarity Threshold (0.3)?

Without filtering, ChromaDB ALWAYS returns top-K results β€” even if they're completely irrelevant. In a small collection with only 3 chunks, ALL three will be returned even if none are related to the query.

if similarity < 0.3:
    continue  # Skip low-relevance results

Why 0.3? This threshold was chosen empirically:

  • Above 0.7: Very high confidence β€” the chunk is clearly about the same topic
  • 0.3 to 0.7: Moderate relevance β€” may contain useful context
  • Below 0.3: Likely noise β€” including it would confuse the LLM more than help it

Setting it too high (0.7) would miss useful-but-not-exact matches. Setting it too low (0.1) would include irrelevant code that wastes LLM context tokens and might cause hallucinations.

The Query Cap: 5000 Characters

query_embeddings = embed_texts([query_text[:5000]])  # Cap query size

Why cap the query? The PR diff for a 100-file refactoring could be 50,000 characters. Embedding all of it would:

  1. Dilute the semantic signal β€” a query about "everything" matches nothing well
  2. Exceed the embedding model's effective context window
  3. Be slow (embedding time scales with input length)

The first 5000 characters typically capture the most important changes (the primary files modified, the core logic changes). Later changes are often test updates, import fixes, or boilerplate that don't help with retrieval.

How Retrieved Context Is Formatted

The retriever formats results as Markdown that the LLM can parse:

## Related Code Context (from repository)

### app/db/connection.py (lines 1-60, relevance: 78%)

class DatabaseConnection: def execute(self, query, params=None): return self.cursor.execute(query, params)


### app/middleware/auth.py (lines 20-80, relevance: 65%)

def validate_user_id(user_id): if not isinstance(user_id, int): raise ValueError("Invalid user ID")


Each chunk includes:

  • Filepath β€” so the LLM knows WHERE the code lives
  • Line range β€” so the LLM can reference it precisely
  • Relevance score β€” so the LLM can weight high-relevance chunks more

Interview talking point: "The retriever converts L2 distance to a similarity score and filters below 0.3 to prevent noise. We cap the query at 5000 characters because embedding the entire diff of a 100-file PR would dilute the semantic signal. Retrieved context is formatted with filepath and relevance scores so the LLM can weight its relevance appropriately."


Concept 6: asyncio.gather() β€” Parallel Agent Execution

Why Parallel Execution Matters: The Latency Math

Each agent makes an HTTP call to Groq's API and waits for the response. If we run them sequentially, we wait for each one to finish before starting the next:

Sequential execution (BEFORE):
    Security Agent:     ################        5.2s
    Performance Agent:                  ################        4.8s
    Style Agent:                                        ################  3.5s
    Total:              ================================================ 13.5s
                                                         SUM of all three

Parallel execution (AFTER):
    Security Agent:     ################        5.2s
    Performance Agent:  ################        4.8s     <-- running simultaneously
    Style Agent:        ################        3.5s     <-- running simultaneously
    Total:              ================        5.2s
                                         MAX of the three

    Speedup: 13.5s --> 5.2s = 2.6x faster

This is not a marginal improvement. For the developer waiting for the review, 13.5 seconds feels annoyingly slow. 5.2 seconds feels responsive.

How Python Async Works: Event Loop and Coroutines

asyncio is NOT multithreading. It runs on a single thread using cooperative multitasking. The key insight: our agents spend 95% of their time WAITING for Groq's HTTP response. During that wait, the CPU is idle. asyncio uses that idle time to run other tasks.

How asyncio.gather() executes 3 agent reviews:

Time   Event Loop Activity
----   ---------------------------------------------------
0ms    Start Security Agent --> sends HTTP request to Groq
1ms    CPU is FREE (waiting for network) --> start Performance Agent
2ms    CPU is FREE --> start Style Agent
3ms    All 3 HTTP requests are "in flight" simultaneously
       ...
       ... (waiting for Groq to respond -- CPU is idle)
       ...
3500ms Groq responds to Style Agent --> resume, process result
4800ms Groq responds to Performance Agent --> resume, process result
5200ms Groq responds to Security Agent --> resume, process result
5200ms asyncio.gather() returns all 3 results

Important: This works because the bottleneck is network I/O (waiting for Groq), not CPU computation. While waiting for a network response, the CPU has nothing to do β€” asyncio fills that idle time with other coroutines.

Coroutines vs. Threads vs. Processes

Approach Overhead Best For Python Limitation
asyncio (coroutines) Minimal (~few KB per task) I/O-bound work (HTTP calls, DB queries) Single thread, cooperative
threading ~8 MB per thread stack I/O-bound work with blocking libraries GIL prevents true CPU parallelism
multiprocessing Full process (~30 MB+) CPU-bound work (ML inference, math) IPC overhead, no shared memory

Why asyncio for our agents: Each agent is I/O-bound (waiting for Groq's API). asyncio has near-zero overhead per coroutine and avoids the GIL contention that threads suffer from. If we were doing CPU-intensive work (like running the embedding model on 1000 chunks), we would use multiprocessing instead.

The gather() + Graceful Degradation Pattern

Here is the actual code in main.py:

# Create all three agents
security_agent = SecurityAgent()
performance_agent = PerformanceAgent()
style_agent = StyleAgent()

# Run all three concurrently -- total time = max(agent times), not sum
security_findings, performance_findings, style_findings = await asyncio.gather(
    security_agent.review(pr_data, rag_context),
    performance_agent.review(pr_data, rag_context),
    style_agent.review(pr_data, rag_context),
)

# Merge results from all agents
findings = security_findings + performance_findings + style_findings

The graceful degradation part: Each agent handles its own exceptions internally (in BaseAgent.review()). If one agent fails β€” Groq times out, the model returns invalid JSON, the static tool crashes β€” it catches the exception and returns [] (empty list). This means asyncio.gather() NEVER sees an exception. All three calls always "succeed."

What happens if Performance Agent's Groq call times out:

    asyncio.gather(
        security_agent.review(...)      --> returns [finding1, finding2]    OK
        performance_agent.review(...)   --> catches exception, returns []   FAILED GRACEFULLY
        style_agent.review(...)         --> returns [finding3, finding4]    OK
    )
    # Result: [finding1, finding2] + [] + [finding3, finding4]
    #       = 4 findings from 2 agents
    # Better than crashing the entire pipeline!

Why not use asyncio.gather(return_exceptions=True)? That would return the Exception object in the results list instead of raising it. But we don't need it β€” our agents already handle exceptions internally. Using return_exceptions=True would complicate the calling code (need to check if each result is a list or an Exception) for no benefit.

Interview talking point: "We run all three agents concurrently using asyncio.gather(), which reduces total latency from the sum of agent times to the maximum β€” a 2.6x speedup in practice. This works because each agent is I/O-bound (waiting for the Groq API), not CPU-bound, so asyncio's cooperative multitasking uses the idle wait time to service other agents. Each agent handles exceptions internally, so a single agent failure doesn't crash the pipeline β€” the remaining agents' findings are still posted."


Concept 7: Integration into base_agent.py β€” The rag_context Parameter

What Changed

The review() method in BaseAgent was updated to accept an optional rag_context parameter:

# BEFORE (Week 3):
async def review(self, pr_data: PRData) -> list[Finding]:

# AFTER (Week 6):
async def review(self, pr_data: PRData, rag_context: str = "") -> list[Finding]:
#                                       ^^^^^^^^^^^^^^^^^^^^
#                                       New parameter with empty default

How RAG Context Reaches the LLM

The prompt template was updated to include a {rag_context} placeholder:

def _build_prompt(self) -> ChatPromptTemplate:
    return ChatPromptTemplate.from_messages([
        ("system", self.system_prompt),
        ("human", (
            "## PR Diff\n"
            "```diff\n{diff}\n```\n\n"
            "## Changed File Contents\n"
            "{file_contents}\n\n"
            "## Static Analysis Results\n"
            "{static_analysis}\n\n"
            "{rag_context}\n\n"              # <-- RAG context injected here
            "Analyze this PR and return your findings as structured JSON."
        )),
    ])

And in the review() method, the context is passed through:

result = await chain.ainvoke({
    "diff": pr_data.diff[:15000],
    "file_contents": self._format_file_contents(pr_data.file_contents),
    "static_analysis": static_results or "No static analysis results.",
    "rag_context": rag_context or "",   # <-- Injected here
})

Why rag_context Defaults to Empty String

This design decision embodies the graceful degradation principle:

  1. If RAG fails (model not loaded, ChromaDB error, no relevant chunks found) β€” the agents still work, they just have less context. The LLM prompt simply has an empty string where the RAG context would be.
  2. In tests β€” we don't need to mock the entire RAG pipeline. Just call agent.review(pr_data) without the second argument.
  3. Backward compatibility β€” existing code that calls review(pr_data) without rag_context continues to work without modification.

This follows a pattern sometimes called "fail-open" in security contexts: RAG is an enhancement, not a requirement. Reviews still work without it β€” they're just less informed.


Concept 8: Integration into main.py β€” The Full Updated Pipeline

The Complete Flow

Here is how everything connects in the _process_pr_review function:

async def _process_pr_review(repo_full_name, pr_number, commit_sha, installation_id):
    """Background task: fetch PR data and post a review."""

    # --- Step 1: Fetch PR data (Week 2) ---
    client = GitHubClient(installation_id)
    pr_data = await client.fetch_pr_data(repo_full_name, pr_number)

    # --- Step 2: RAG β€” Index files into ChromaDB (Week 6 NEW) ---
    rag_context = ""
    try:
        collection_name = await index_repo_files(
            repo_full_name, pr_data.file_contents
        )
        # --- Step 3: RAG β€” Retrieve relevant context (Week 6 NEW) ---
        rag_context = await retrieve_context(
            collection_name, pr_data.diff[:5000]
        )
    except Exception as rag_err:
        logger.warning("RAG context unavailable", error=str(rag_err))
        # Continue without RAG β€” fail-open pattern

    # --- Step 4: Run 3 agents in parallel (Week 6 NEW) ---
    security_agent = SecurityAgent()
    performance_agent = PerformanceAgent()
    style_agent = StyleAgent()

    security_findings, performance_findings, style_findings = await asyncio.gather(
        security_agent.review(pr_data, rag_context),
        performance_agent.review(pr_data, rag_context),
        style_agent.review(pr_data, rag_context),
    )

    # --- Step 5: Merge findings and compute health score ---
    findings = security_findings + performance_findings + style_findings
    # ... health score calculation, SynthesizedReview construction ...

    # --- Step 6: Post to GitHub ---
    # ... inline comments with fallback to summary comment ...

    # --- Step 7: Cache in Redis ---
    await mark_as_reviewed(commit_sha)

What Changed from Previous Weeks

Component Before Week 6 After Week 6
RAG Not present Index files --> embed --> store --> query --> retrieve
Agent execution Sequential: findings = await security_agent.review(pr_data) Parallel: asyncio.gather(agent1.review(...), agent2.review(...), agent3.review(...))
Agent review() review(pr_data) β€” one argument review(pr_data, rag_context) β€” two arguments
LLM prompt diff + files + static analysis diff + files + static analysis + RAG context
Error handling Agent-level only Agent-level + RAG-level (try/except around RAG pipeline)

The try/except Around the RAG Pipeline

Notice that the entire RAG block (index + retrieve) is wrapped in a try/except:

rag_context = ""
try:
    collection_name = await index_repo_files(...)
    rag_context = await retrieve_context(...)
except Exception as rag_err:
    logger.warning("RAG context unavailable", error=str(rag_err))

This means if ANYTHING goes wrong with RAG β€” sentence-transformers not installed, ChromaDB crashes, embedding model returns garbage β€” the pipeline continues with rag_context = "". The agents receive an empty string for RAG context and proceed with diff + files + static analysis only. This is the fail-open pattern applied at the pipeline level.


Code Walkthroughs

embedder.py β€” The Embedding Pipeline

File: app/context/embedder.py

This file has three responsibilities:

  1. Lazy-load the sentence-transformers model
  2. Convert text to embeddings
  3. Chunk source code into embeddable pieces
# Lazy-loaded model to avoid slow import at startup
_model = None

def get_embedding_model():
    """
    Lazy-load the sentence-transformers model.

    We load on first use (not at import time) because:
    1. The model takes ~2 seconds to load from cache (~56s cold download)
    2. Not every request needs embeddings (cached reviews skip this)
    3. Tests shouldn't load a real ML model β€” they mock embed_texts()
    """
    global _model
    if _model is None:
        try:
            from sentence_transformers import SentenceTransformer
            #     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
            #     Import is INSIDE the function, not at module top.
            #     This means importing embedder.py is instant.
            #     The heavy SentenceTransformer import only happens
            #     when someone actually calls get_embedding_model().
            _model = SentenceTransformer(settings.embedding_model)
            #        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
            #        settings.embedding_model = "all-MiniLM-L6-v2"
            #        On first call, this downloads ~90MB from HuggingFace.
            #        Subsequent calls use the cached model (~2s load).
        except ImportError:
            logger.warning("sentence-transformers not installed -- RAG disabled")
            return None
    return _model
def embed_texts(texts: list[str]) -> list[list[float]]:
    """Convert text strings to 384-dimensional vectors."""
    model = get_embedding_model()
    if model is None:
        return []         # Graceful degradation if model unavailable

    embeddings = model.encode(texts, show_progress_bar=False)
    #            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    #            Batch encoding: more efficient than encoding one at a time
    #            because the model can process multiple inputs in a single
    #            forward pass through the neural network.
    return embeddings.tolist()
    #      ^^^^^^^^^^^^^^^^^^^
    #      Convert from NumPy array to Python list (ChromaDB expects lists)

indexer.py β€” The ChromaDB Indexer

File: app/context/indexer.py

async def index_repo_files(repo_full_name, file_contents):
    client = _get_chroma_client()              # Singleton ChromaDB client
    collection_name = _collection_name(repo_full_name)  # Sanitize name

    # Get or create a collection for THIS repo (isolation between repos)
    collection = client.get_or_create_collection(
        name=collection_name,
        metadata={"repo": repo_full_name},
    )

    # Chunk all files, skipping files > 100KB (likely binary/generated)
    all_chunks = []
    for filepath, content in file_contents.items():
        if len(content) > 100_000:
            continue                           # Skip huge files
        chunks = chunk_code(content, filepath)
        all_chunks.extend(chunks)

    # Safety: cap total chunks to avoid OOM on Render's 512MB RAM
    max_chunks = settings.max_repo_files_index   # Default: 500
    if len(all_chunks) > max_chunks:
        all_chunks = all_chunks[:max_chunks]

    # Batch embed all chunks (one call to the model)
    texts = [chunk["text"] for chunk in all_chunks]
    embeddings = embed_texts(texts)

    # Upsert: insert or update (idempotent for re-indexing)
    ids = [f"{chunk['filepath']}:{chunk['start_line']}" for chunk in all_chunks]
    collection.upsert(ids=ids, embeddings=embeddings, documents=texts, metadatas=metadatas)

    return collection_name   # Passed to retriever for querying

retriever.py β€” The RAG Retriever

File: app/context/retriever.py

async def retrieve_context(collection_name, query_text, top_k=5):
    try:
        client = _get_chroma_client()

        # If collection doesn't exist, there's nothing to retrieve
        try:
            collection = client.get_collection(name=collection_name)
        except Exception:
            return ""       # No index yet -- proceed without RAG

        if collection.count() == 0:
            return ""       # Empty collection -- nothing to search

        # Embed the query using the SAME model used for indexing
        # (critical: mismatched models would produce incompatible vectors)
        query_embeddings = embed_texts([query_text[:5000]])

        # Nearest neighbor search
        results = collection.query(
            query_embeddings=query_embeddings,
            n_results=min(top_k, collection.count()),
            include=["documents", "metadatas", "distances"],
        )

        # Format results, filtering by relevance
        context_parts = ["## Related Code Context (from repository)\n"]
        for doc, metadata, distance in zip(
            results["documents"][0],
            results["metadatas"][0],
            results["distances"][0],
        ):
            similarity = max(0, 1 - distance / 2)   # L2 --> 0-1 similarity
            if similarity < 0.3:
                continue                             # Skip irrelevant results

            context_parts.append(
                f"### {filepath} (lines {start}-{end}, relevance: {similarity:.0%})\n"
                f"```\n{doc}\n```\n"
            )

        if len(context_parts) == 1:      # Only the header, no actual results
            return ""

        return "\n".join(context_parts)

    except Exception as e:
        logger.warning("RAG retrieval failed", error=str(e))
        return ""                        # Fail-open: agents work without RAG

Live Test Results: PR #4

RAG in Action

Webhook received -- PR #4, sha=a1b2c3d4

[Step 1] Fetched PR data: 1 file, 1 with content
[Step 2] Chunking: 1 file --> 1 chunk (file was < 60 lines)
[Step 3] Embedding: 1 chunk --> [0.23, -0.15, 0.87, ...] (384 dims)
[Step 4] ChromaDB upsert: 1 chunk stored in collection "repo_ninjacode911_codeguard_test"
[Step 5] Query: embedded PR diff, searched ChromaDB
[Step 6] Retrieved: 1 relevant chunk (relevance: 72%)
[Step 7] Injected RAG context into all 3 agent prompts
[Step 8] asyncio.gather: 3 agents started concurrently
[Step 9] All agents completed in ~7 seconds (after model load)

The Cold Start Problem

First PR review after deployment:

[00.0s]   Webhook received
[56.2s]   sentence-transformers model downloaded from HuggingFace (COLD START)
[56.8s]   Model loaded, embedding started
[57.0s]   Indexing complete (1 chunk)
[57.2s]   Retrieval complete (1 chunk returned)
[64.0s]   All 3 agents completed
[64.5s]   Posted to GitHub
           Total: ~64 seconds (56s model download + 8s actual work)

Second PR review (model cached):

[00.0s]   Webhook received
[02.0s]   Model loaded from cache
[02.2s]   Indexing complete
[02.4s]   Retrieval complete
[09.0s]   All 3 agents completed
[09.5s]   Posted to GitHub
           Total: ~9 seconds (2s model load + 7s actual work)

The 56-second cold start is addressed by the pre-warm cron job from Week 1, which hits the /health endpoint periodically to keep the service warm. In a future iteration, we could trigger model pre-loading on the /health endpoint itself.


Bugs Encountered and Fixed

Bug 1: sentence-transformers Cold Start (~56 seconds)

Symptom: First PR review after deployment took 70+ seconds instead of ~9 seconds.

Cause: SentenceTransformer("all-MiniLM-L6-v2") downloads the model from HuggingFace Hub on first use (56 seconds on Render's network). Subsequent loads use the local cache (2 seconds).

Fix: Lazy loading pattern β€” the model is only loaded when embed_texts() is first called, not at import time. Combined with the pre-warm cron (Week 1), the first real PR review always hits a warm model cache.

_model = None

def get_embedding_model():
    global _model
    if _model is None:
        from sentence_transformers import SentenceTransformer
        _model = SentenceTransformer(settings.embedding_model)
    return _model

Why not pre-load at server startup? Because the server needs to respond to Render's health check within seconds of starting. If we blocked startup for 56 seconds, Render would think the service crashed and kill it.

Bug 2: ChromaDB Collection Name Validation

Symptom: ValueError when creating a ChromaDB collection.

Cause: ChromaDB collection names must be 3-63 characters, containing only alphanumeric characters and underscores. GitHub repo names like ninjacode911/code-guard-test contain slashes and hyphens β€” both rejected by ChromaDB with an opaque error message.

Fix: The _collection_name() sanitizer replaces invalid characters:

def _collection_name(repo_full_name: str) -> str:
    name = repo_full_name.replace("/", "_").replace("-", "_")
    return f"repo_{name}"[:63]

Lesson: Always validate inputs at system boundaries. ChromaDB's error message was "Expected collection name to match..." without specifying which characters were invalid.


Tests Written (Week 6)

test_rag_pipeline.py β€” 10 Tests

Test What It Verifies
test_small_file_single_chunk File < 60 lines produces exactly 1 chunk
test_large_file_multiple_chunks 150-line file produces 2+ overlapping chunks
test_chunk_includes_filepath_in_text # File: src/utils/helper.py appears in chunk text
test_skips_nearly_empty_chunks Chunks with < 5 non-empty lines are filtered out
test_chunk_metadata_has_line_numbers start_line=1, end_line=30, overlap starts at 21
test_converts_repo_name_to_valid_collection Slashes and hyphens replaced, repo_ prefix
test_truncates_long_names Collection names capped at 63 characters
test_index_repo_files_returns_collection_name Indexing returns valid collection name
test_index_handles_empty_files Empty file dict does not crash
test_index_skips_large_files Files > 100KB excluded from embedding

test_parallel_agents.py β€” 6 Tests

Test What It Verifies
test_all_agents_have_unique_names {"security", "performance", "style"} are distinct
test_all_agents_load_prompts All 3 prompts load without filesystem errors
test_prompts_are_domain_specific Security has "CWE", Performance has "N+1", Style has "naming"
test_prompts_have_scope_boundaries Each prompt says "do not comment on" other domains
test_gather_runs_concurrently 3 x 0.1s tasks complete in < 0.25s (not 0.3s)
test_gather_handles_partial_failure One failing task returns [], others return results

Total: 16 New Tests Across 2 Files

Test design decisions:

  • Embeddings are mocked β€” We mock embed_texts() to return [[0.1] * 384] instead of loading the real model. Without mocking, every test run would wait 2-56 seconds for model loading, making the test suite impractically slow.

  • ChromaDB is NOT mocked β€” We use the real in-memory ChromaDB client in tests. It's fast (milliseconds), deterministic, and requires no setup. Mocking it would hide integration issues between our code and ChromaDB's API.

  • Parallel execution is tested with asyncio.sleep() β€” We verify that asyncio.gather() runs tasks concurrently by timing them: three 0.1-second sleeps should complete in ~0.1s (parallel) not ~0.3s (sequential).


Files Created/Modified in Week 6

File Type Purpose
app/context/embedder.py New Embedding pipeline: lazy model loading, embed_texts(), chunk_code()
app/context/indexer.py New ChromaDB indexer: collection-per-repo, upsert semantics, chunk limits
app/context/retriever.py New RAG retriever: similarity search, threshold filtering, context formatting
app/agents/base_agent.py Modified Added rag_context parameter to review() and {rag_context} to prompt template
app/main.py Modified Added RAG pipeline (index + retrieve) and asyncio.gather() for 3 parallel agents
tests/unit/test_rag_pipeline.py New 10 tests for chunking, indexing, retrieval
tests/unit/test_parallel_agents.py New 6 tests for agent identity and concurrent execution

Dependencies Added

Package Purpose
sentence-transformers>=3.3.0 Local embedding model (all-MiniLM-L6-v2, 22M params, 384 dims)
chromadb>=0.5.0 In-memory vector database for storing and searching embeddings

Architecture Patterns Used (Interview Reference)

Pattern Where Used What It Means Why It Matters
RAG (Retrieval-Augmented Generation) embedder + indexer + retriever External knowledge injected into LLM prompt Agents see related code beyond the diff, reducing false positives
Lazy Loading embedder.py (_model = None) Resource initialized on first use, not at import time Avoids 56-second cold-start penalty on every import
Singleton embedder.py, indexer.py (_chroma_client) Module-level global ensures exactly one instance One embedding model, one ChromaDB client β€” no redundant memory
Fail-Open retriever.py, main.py If RAG fails, agents proceed without context RAG is an enhancement, not a requirement β€” reviews still work without it
Concurrent Execution main.py (asyncio.gather()) Multiple I/O-bound tasks run on one thread cooperatively 2.6x latency reduction (5s instead of 15s)
Graceful Degradation base_agent.py (return [] on error) Failures return empty results instead of crashing One agent failing doesn't kill the other agents' findings
Upsert Semantics indexer.py (collection.upsert()) Insert-or-update prevents duplicate entries Re-indexing same file on re-review is idempotent
Input Sanitization indexer.py (_collection_name()) Clean external input before passing to storage GitHub repo names contain characters ChromaDB rejects
Overlap Chunking embedder.py (10-line overlap) Adjacent chunks share boundary lines Functions spanning chunk boundaries remain complete in at least one chunk

Key Interview Talking Points Summary

  1. RAG for Code Review: "RAG gives our agents 'peripheral vision' beyond the diff. When reviewing a database query change, RAG retrieves the DB wrapper class, validation middleware, and similar patterns from across the repository. We use sentence-transformers for local embeddings (no API cost, ~10ms per chunk) and ChromaDB as an embedded vector store (no infrastructure)."

  2. Embeddings: "We use all-MiniLM-L6-v2 β€” a 22-million parameter model that produces 384-dimensional vectors. It runs on CPU in ~10ms per chunk, which is fast enough for real-time indexing during webhook processing. Unlike keyword search, embeddings capture semantic meaning β€” a query about 'database connection' matches code containing sqlite3.connect() even though the words are different."

  3. Chunking Strategy: "We chunk code into 60-line blocks with 10-line overlap. Sixty lines is roughly one function β€” the natural semantic unit of code. The overlap ensures that functions spanning chunk boundaries are complete in at least one chunk. We skip near-empty chunks to avoid polluting the vector store."

  4. ChromaDB Choice: "ChromaDB runs embedded in the Python process β€” zero infrastructure. We accept the trade-off of in-memory storage because Render's free tier has ephemeral disk, and rebuilding the index takes under 2 seconds for typical PRs. Each repo gets its own collection for isolation, and upsert semantics make re-indexing idempotent."

  5. Parallel Execution: "We run all three agents concurrently with asyncio.gather(). Since each agent is I/O-bound (waiting for the Groq API), asyncio's cooperative multitasking overlaps the wait times. Total latency is max(agent times) not sum β€” a 2.6x speedup. Each agent handles exceptions internally, so one failure doesn't crash the others."

  6. Fail-Open Design: "Every component in the RAG pipeline can fail without crashing the system. If the embedding model fails to load, agents work without RAG context. If ChromaDB throws an error, the try/except in main.py catches it and continues. If one agent's LLM call times out, the other two agents' findings are still posted. We always prefer partial results over total failure."


Cumulative Test Count

Week New Tests Cumulative Total
Week 1 8 (schema validation) 8
Week 2 12 (webhook + cache) 20
Week 3 15 (security agent + tools + formatter) 35
Week 4 8 (performance agent + radon) 43
Week 5 9 (style agent + ruff) 52
Week 6 16 (RAG pipeline + parallel agents) 68

Documentation written 2026-03-20 as part of Week 6 completion.