File size: 11,217 Bytes
4b445f6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
"""
Base Agent Interface
=====================

All domain agents (Security, Performance, Style) inherit from this base class.
It provides shared infrastructure:

1. **Groq LLM client** β€” ChatGroq configured with Llama-3.1-70B
2. **Structured output** β€” LLM returns typed Finding objects, not raw text
3. **Error handling** β€” graceful fallback if the LLM call fails
4. **Timing** β€” measures how long each agent takes (for latency metrics)

Design pattern: Template Method
- The base class defines the algorithm skeleton (receive diff β†’ run tools β†’ call LLM β†’ return findings)
- Subclasses override specific steps (system_prompt, run_static_tools)
- This prevents code duplication across 3 agents that follow the same flow

Why LangChain?
- Provides a unified interface across LLM providers (Groq, Gemini, OpenAI)
- If Groq goes down, we swap to Gemini by changing one line
- Structured output parsing is built in (with_structured_output)
- Prompt templates with variable substitution
"""

from __future__ import annotations

import time
from abc import ABC, abstractmethod

import structlog
from langchain_core.prompts import ChatPromptTemplate
from langchain_groq import ChatGroq
from pydantic import BaseModel, Field

from app.config import settings
from app.github.client import PRData
from app.models.findings import Finding

logger = structlog.get_logger()


class AgentFindings(BaseModel):
    """
    Schema for the LLM's structured output.

    By wrapping findings in a Pydantic model, we can use LangChain's
    `with_structured_output()` which constrains the LLM to return
    valid JSON matching this exact schema. No more parsing raw text!

    How with_structured_output() works under the hood:
    1. It adds the JSON schema to the system prompt
    2. It sets response_format to JSON mode (if the model supports it)
    3. It validates the response against the schema
    4. If validation fails, it retries (configurable)
    """

    findings: list[FindingOutput] = Field(
        default_factory=list,
        description="List of security/performance/style findings",
    )


class FindingOutput(BaseModel):
    """
    The schema we ask the LLM to produce for each finding.

    This is slightly different from our internal Finding model because:
    - The LLM doesn't know which agent it is (we add that after)
    - We give the LLM freedom on field names that match its training
    - We validate and convert to our Finding model post-LLM

    Note: This class is defined BEFORE AgentFindings because Python
    needs it to exist when AgentFindings references it. But Pydantic
    handles forward references with model_rebuild().
    """

    file_path: str = Field(description="Path to the file (e.g., 'app.py')")
    line_start: int = Field(description="Starting line number of the issue")
    line_end: int = Field(description="Ending line number of the issue")
    severity: str = Field(description="One of: critical, high, medium, low")
    category: str = Field(description="Issue category (e.g., 'sql_injection', 'hardcoded_secret')")
    title: str = Field(description="Short one-line title of the finding")
    description: str = Field(description="Detailed explanation of the issue and its impact")
    suggested_fix: str = Field(default="", description="Corrected code snippet")
    cwe_id: str | None = Field(default=None, description="CWE ID if applicable (e.g., 'CWE-89')")
    confidence: float = Field(description="Confidence score from 0.0 to 1.0")


# Rebuild the model to resolve the forward reference
AgentFindings.model_rebuild()


class BaseAgent(ABC):
    """
    Abstract base class for all domain agents.

    Subclasses must implement:
    - agent_name: which agent this is ("security", "performance", "style")
    - system_prompt: the detailed system prompt for the LLM
    - run_static_analysis(): optional static tools (Bandit, Semgrep, etc.)

    Usage:
        agent = SecurityAgent()
        findings = await agent.review(pr_data)
    """

    def __init__(self):
        """
        Initialize the LLM client.

        ChatGroq connects to Groq's API which runs Llama-3.1-70B at
        500+ tokens/sec β€” the fastest open-source LLM inference available.
        This speed is critical: we need each agent to complete in 3-8 seconds
        so the full review stays under 15 seconds.

        Temperature=0.1: We want nearly deterministic output. Code review
        should be consistent β€” the same code should get the same findings.
        A small temperature (not 0) allows slight variation to avoid
        getting stuck in repetitive patterns.
        """
        self.llm = ChatGroq(
            model="llama-3.3-70b-versatile",
            api_key=settings.groq_api_key,
            temperature=0.1,
            max_tokens=4096,
        )

    @property
    @abstractmethod
    def agent_name(self) -> str:
        """The agent identifier: 'security', 'performance', or 'style'."""
        ...

    @property
    @abstractmethod
    def system_prompt(self) -> str:
        """The full system prompt for this agent."""
        ...

    async def run_static_analysis(self, pr_data: PRData) -> str:
        """
        Run static analysis tools on the PR files.

        Override in subclasses to run agent-specific tools:
        - SecurityAgent: Bandit + detect-secrets
        - PerformanceAgent: radon + AST analysis
        - StyleAgent: Ruff/pylint

        Returns a string summary of tool findings to include in the LLM prompt.
        Default: no static analysis (LLM-only review).
        """
        return ""

    def _build_prompt(self) -> ChatPromptTemplate:
        """
        Build the LangChain prompt template.

        ChatPromptTemplate.from_messages() creates a multi-turn prompt:
        - ("system", ...) β†’ the system message (agent persona + instructions)
        - ("human", ...) β†’ the user message (the actual PR data to review)

        Variables in {curly_braces} are substituted at runtime with .ainvoke().
        """
        return ChatPromptTemplate.from_messages([
            ("system", self.system_prompt),
            ("human", (
                "## PR Diff\n"
                "```diff\n{diff}\n```\n\n"
                "## Changed File Contents\n"
                "{file_contents}\n\n"
                "## Static Analysis Results\n"
                "{static_analysis}\n\n"
                "{rag_context}\n\n"
                "Analyze this PR and return your findings as structured JSON."
            )),
        ])

    def _convert_to_findings(self, agent_output: AgentFindings) -> list[Finding]:
        """
        Convert the LLM's output to our internal Finding model.

        This adds the agent_name field and validates/clamps values:
        - Severity is lowercased and validated
        - Confidence is clamped to [0.0, 1.0]
        - Invalid findings are skipped (not crashed on)
        """
        findings = []
        for f in agent_output.findings:
            try:
                severity = f.severity.lower().strip()
                if severity not in ("critical", "high", "medium", "low"):
                    severity = "medium"  # Default for ambiguous severity

                confidence = max(0.0, min(1.0, f.confidence))

                finding = Finding(
                    agent=self.agent_name,
                    file_path=f.file_path,
                    line_start=f.line_start,
                    line_end=f.line_end,
                    severity=severity,
                    category=f.category,
                    title=f.title,
                    description=f.description,
                    suggested_fix=f.suggested_fix,
                    cwe_id=f.cwe_id,
                    confidence=confidence,
                )
                findings.append(finding)
            except Exception as e:
                logger.warning(
                    "Skipping malformed finding",
                    agent=self.agent_name,
                    error=str(e),
                )
        return findings

    def _format_file_contents(self, file_contents: dict[str, str]) -> str:
        """
        Format file contents for the LLM prompt.

        Each file is wrapped in a code block with its path as a header.
        We truncate very long files to stay within LLM context limits.
        Groq's Llama-3.1-70B has 128K context, so we have plenty of room
        for typical PRs, but we cap each file at 500 lines to be safe.
        """
        parts = []
        for filepath, content in file_contents.items():
            lines = content.split("\n")
            if len(lines) > 500:
                content = "\n".join(lines[:500]) + "\n... (truncated)"
            parts.append(f"### {filepath}\n```\n{content}\n```")
        return "\n\n".join(parts) if parts else "No file contents available."

    async def review(self, pr_data: PRData, rag_context: str = "") -> list[Finding]:
        """
        Main entry point: review a PR and return findings.

        This is the Template Method:
        1. Run static analysis tools (subclass-specific)
        2. Build the prompt with diff + files + tool output + RAG context
        3. Call the LLM with structured output
        4. Convert to Finding objects
        5. Log timing and return

        If the LLM call fails, we return an empty list rather than crashing
        the entire pipeline. The other agents can still contribute findings.

        Args:
            pr_data: The PR diff, file contents, and metadata
            rag_context: Optional RAG context from ChromaDB (related code chunks)
        """
        start_time = time.time()

        try:
            # Step 1: Run static analysis tools
            static_results = await self.run_static_analysis(pr_data)

            # Step 2: Build the prompt
            prompt = self._build_prompt()

            # Step 3: Create the structured output chain
            structured_llm = self.llm.with_structured_output(AgentFindings)
            chain = prompt | structured_llm

            # Step 4: Call the LLM
            result = await chain.ainvoke({
                "diff": pr_data.diff[:15000],  # Cap diff size for token limits
                "file_contents": self._format_file_contents(pr_data.file_contents),
                "static_analysis": static_results or "No static analysis results.",
                "rag_context": rag_context or "",
            })

            # Step 5: Convert to Finding objects
            findings = self._convert_to_findings(result)

            elapsed_ms = int((time.time() - start_time) * 1000)
            logger.info(
                "Agent review completed",
                agent=self.agent_name,
                findings_count=len(findings),
                elapsed_ms=elapsed_ms,
            )

            return findings

        except Exception as e:
            elapsed_ms = int((time.time() - start_time) * 1000)
            logger.error(
                "Agent review failed",
                agent=self.agent_name,
                error=str(e),
                elapsed_ms=elapsed_ms,
            )
            return []  # Don't crash the pipeline β€” other agents can still work