pr-webhook / agent /code_review.py
hari-huynh
Change Slack plain text to Markdown format
0447128
from __future__ import annotations
import argparse
import asyncio
import json
import os
from pathlib import Path
from typing import List, Optional, Dict, Iterable, Tuple
from pydantic import BaseModel, Field
from pydantic_ai import Agent, RunContext
from dotenv import load_dotenv
import logfire
load_dotenv()
# =============================
# Logfire configuration for tracing
# =============================
logfire.configure(token = os.getenv('LOGFIRE_API_KEY'))
logfire.instrument_pydantic_ai()
# =============================
# Data models for structured output
# =============================
class Issue(BaseModel):
title: str = Field(..., description="Short, actionable issue title")
description: str = Field(..., description="Clear explanation, why it matters, and how to fix")
severity: str = Field(..., description="One of: low, medium, high, critical")
line: Optional[int] = Field(None, description="Line number if known/applicable")
rule: Optional[str] = Field(None, description="Optional rule or best practice identifier")
class FileReview(BaseModel):
file_path: str
summary: str
score: int = Field(..., ge=0, le=10, description="10 = excellent, 0 = very poor")
issues: List[Issue] = Field(default_factory=list)
suggestions: List[str] = Field(default_factory=list)
class CodeReviewResponse(BaseModel):
overall_summary: str
overall_score: int = Field(..., ge=0, le=10)
files: List[FileReview]
quick_actions: List[str] = Field(default_factory=list, description="Concise TODOs that can be applied immediately")
class DiffDeps(BaseModel):
diff: str
# =============================
# Agent definition (Pydantic AI)
# =============================
DEFAULT_MODEL = "google-gla:gemini-2.5-pro"
code_review_agent = Agent(
model = DEFAULT_MODEL,
deps_type = DiffDeps,
output_type = str,
)
@code_review_agent.system_prompt
def systemt_prompt(ctx: RunContext) -> str:
return f"""
You are a code review agent focused on analyzing pull request changes and generating concise summary change logs.
## Input Processing
First, carefully examine the provided diff:
{ctx.deps.diff}
**Important**: Before proceeding with the review, check if the diff contains any actual changes:
- If the diff is empty, contains only whitespace, or shows no meaningful modifications
- If the diff indicates "no changes" or similar status
- If all changes are just formatting/whitespace without functional impact
## Your Role
- Review code changes in pull requests
- Generate clear, actionable summary change logs
- Focus solely on what changed and its impact
- Provide appropriate responses when no changes are detected
## What to Review
- **Code modifications**: Added, deleted, or modified lines
- **Functional changes**: New features, bug fixes, refactoring
- **Structural changes**: File additions/deletions, directory reorganization
- **Dependency updates**: Package changes, version bumps
- **Configuration changes**: Environment, build, or deployment configurations
## Response Logic
### If NO changes are detected:
Simply respond with:
```
No changes detected in this pull request.
```
### If changes ARE detected, include in summary:
1. **High-level overview**: Brief description of the PR's purpose
2. **Key changes**: List of main modifications made
3. **Files affected**: Count and types of files changed
4. **Impact assessment**: Brief note on potential effects
5. **Breaking changes**: Highlight any breaking changes prominently
## Output Format (for PRs with changes)
```
## Pull Request Summary
**Purpose**: [Brief description of what this PR accomplishes]
**Changes Made**:
- [Change 1 with file reference and brief description]
- [Change 2 with file reference and brief description]
- [Change 3 with file reference and brief description]
**Files Modified**: X files changed (+Y additions, -Z deletions)
**Breaking Changes**: [If any, list them here, otherwise state "None"]
**Impact**: [Brief assessment of the changes' significance and potential effects]
```
## What NOT to Focus On
- Code style preferences (unless specifically requested)
- Performance optimizations (unless critical)
- Architecture discussions
- Non-functional requirements
- Testing strategies (unless tests are part of the changes)
## Guidelines
- **Always check for actual changes first** - don't generate summaries for empty diffs
- Keep summaries concise but informative (aim for 3-5 bullet points maximum)
- Use clear, non-technical language when possible
- Highlight breaking changes prominently with clear warnings
- Focus on the "what" and "impact" not the "how" or "why"
- Maintain objectivity in descriptions
- Be specific about file types and locations when relevant
- If changes are minimal (e.g., only comments or whitespace), mention this explicitly
## Edge Cases
- **Empty diff**: Respond with "No changes detected in this pull request."
- **Only whitespace/formatting changes**: Mention this explicitly: "Only formatting/whitespace changes detected."
- **Very large diffs**: Focus on the most significant changes and note if summary is abbreviated
- **Binary files**: Note that binary files were changed but cannot be reviewed in detail
"""
def read_text_file(path: str) -> str:
"""Read a UTF-8 text file from disk and return its contents. Truncates very large files.
Args:
path: Absolute or relative path to a text file.
Returns:
File text content (possibly truncated to keep context size reasonable).
"""
file_path = Path(path)
if not file_path.exists() or not file_path.is_file():
raise FileNotFoundError(f"File not found: {path}")
try:
text = file_path.read_text(encoding="utf-8", errors="ignore")
except Exception as exc: # pragma: no cover - defensive
raise RuntimeError(f"Failed to read file: {path}: {exc}")
max_chars = 200_000
if len(text) > max_chars:
head = text[: max_chars // 2]
tail = text[-max_chars // 2 :]
return f"{head}\n\n... [truncated] ...\n\n{tail}"
return text
def list_code_files(
ctx: RunContext,
root: str,
include_extensions: List[str] | None = None,
exclude_dirs: List[str] | None = None,
max_files: int = 200,
) -> List[str]:
"""List code files under a directory.
Args:
root: Directory root to scan
include_extensions: e.g. [".py", ".ts", ".js", ".tsx", ".java"]. If omitted, uses a sensible default set.
exclude_dirs: Directory names to skip (e.g. ["node_modules", ".git", "dist", "build"]).
max_files: Upper bound on number of results to avoid huge contexts.
Returns:
List of file paths (strings) relative to the provided root where possible.
"""
root_path = Path(root)
if not root_path.exists():
raise FileNotFoundError(f"Root directory not found: {root}")
default_exts = [
".py",
".ts",
".tsx",
".js",
".jsx",
".java",
".kt",
".go",
".rs",
".rb",
".php",
".cs",
".cpp",
".cc",
".c",
".m",
".mm",
".sql",
".yml",
".yaml",
".toml",
".json",
".md",
]
exts = [e.lower() for e in (include_extensions or default_exts)]
excluded = set(exclude_dirs or {".git", "node_modules", "dist", "build", ".venv", "__pycache__"})
results: List[str] = []
for path in root_path.rglob("*"):
if path.is_dir():
if path.name in excluded:
# Skip excluded directories entirely
# Use try/except to ignore permission errors
try:
# Prevent descending further
continue
finally:
...
continue
if path.suffix.lower() in exts:
results.append(str(path))
if len(results) >= max_files:
break
return results
# =============================
# Utilities
# =============================
def gather_targets(paths: List[str]) -> Tuple[List[str], List[str]]:
"""Split inputs into files and directories; expand files list from directories.
Returns (files, dirs)
"""
files: List[str] = []
dirs: List[str] = []
for p in paths:
path = Path(p)
if path.is_file():
files.append(str(path))
elif path.is_dir():
dirs.append(str(path))
else:
# Ignore non-existent
continue
return files, dirs
def build_user_prompt(
files: List[str],
dirs: List[str],
focus_areas: List[str],
max_inline_chars: int = 60_000,
) -> str:
"""Create a concise instruction for the agent, listing files and review goals.
We do not inline large file contents; the agent can use tools to load them on demand.
Small files may be inlined to reduce tool calls.
"""
focus_text = ", ".join(focus_areas) if focus_areas else "general quality"
# Try to inline very small files to prime the context
inline_blobs: List[str] = []
inlined_total = 0
for f in files:
try:
text = Path(f).read_text(encoding="utf-8", errors="ignore")
except Exception:
continue
if len(text) <= 8_000 and (inlined_total + len(text)) <= max_inline_chars:
inline_blobs.append(f"File: {f}\n\n{text}")
inlined_total += len(text)
file_list_section = "\n".join(f"- {p}" for p in files)
dir_list_section = "\n".join(f"- {d}" for d in dirs)
inline_section = ("\n\n" + "\n\n".join(inline_blobs)) if inline_blobs else ""
return (
"Perform a comprehensive code review for the repository subset below.\n\n"
f"Focus areas: {focus_text}.\n\n"
"Files:\n" + file_list_section + "\n\n"
+ ("Directories (you may list and inspect files using the provided tools):\n" + dir_list_section + "\n\n" if dirs else "")
+ "Use the read_text_file and list_code_files tools to fetch any file content you need.\n"
+ inline_section
)
def render_markdown(result: CodeReviewResponse) -> str:
"""Render a human-readable Markdown report from the structured output."""
lines: List[str] = []
lines.append("# Code Review Report")
lines.append("")
lines.append(f"Overall Score: {result.overall_score}/10")
lines.append("")
lines.append(result.overall_summary)
lines.append("")
for f in result.files:
lines.append(f"## {f.file_path} — Score: {f.score}/10")
lines.append("")
if f.summary:
lines.append(f.summary)
lines.append("")
if f.issues:
lines.append("### Issues")
for idx, issue in enumerate(f.issues, start=1):
where = f" (line {issue.line})" if issue.line is not None else ""
rule = f" — {issue.rule}" if issue.rule else ""
lines.append(f"- [{issue.severity.upper()}]{where}{rule}: {issue.title}")
lines.append(f" - {issue.description}")
lines.append("")
if f.suggestions:
lines.append("### Suggestions")
for s in f.suggestions:
lines.append(f"- {s}")
lines.append("")
if result.quick_actions:
lines.append("## Quick Actions")
for qa in result.quick_actions:
lines.append(f"- {qa}")
lines.append("")
return "\n".join(lines)
# =============================
# Public API
# =============================
async def review_paths(
paths: List[str],
focus_areas: Optional[List[str]] = None,
model: Optional[str] = None,
) -> CodeReviewResponse:
# Start tracing span
with logfire_client.span("review_paths", attributes={
"paths_count": len(paths),
"focus_areas": focus_areas or [],
"model": model or "default"
}) as span:
try:
files, dirs = gather_targets(paths)
# Log file analysis
span.add_event("files_analyzed", attributes={
"files_count": len(files),
"directories_count": len(dirs)
})
agent = code_review_agent if model is None else Agent(
model=model,
result_model=CodeReviewResponse,
system_prompt=systemt_prompt, # Use the system prompt directly
)
user_prompt = build_user_prompt(files, dirs, focus_areas or [])
# Log prompt generation
span.add_event("prompt_generated", attributes={
"prompt_length": len(user_prompt)
})
run = await agent.run(user_prompt)
# Log successful completion
span.add_event("review_completed", attributes={
"overall_score": run.data.overall_score,
"files_reviewed": len(run.data.files)
})
return run.data
except Exception as e:
# Log error
span.record_exception(e)
span.set_status(logfire.StatusCode.ERROR, str(e))
raise
async def review_code_string(
code: str,
filename: str = "snippet",
focus_areas: Optional[List[str]] = None,
model: Optional[str] = None,
) -> CodeReviewResponse:
# Start tracing span
with logfire_client.span("review_code_string", attributes={
"filename": filename,
"code_length": len(code),
"focus_areas": focus_areas or [],
"model": model or "default"
}) as span:
try:
agent = code_review_agent if model is None else Agent(
model=model,
result_model=CodeReviewResponse,
system_prompt=systemt_prompt, # Use the system prompt directly
)
prompt = (
f"Review the following code ({filename}).\n\n" # noqa: E501
f"Focus areas: {', '.join(focus_areas or []) or 'general quality'}.\n\n"
f"{code}"
)
# Log prompt generation
span.add_event("prompt_generated", attributes={
"prompt_length": len(prompt)
})
run = await agent.run(prompt)
# Log successful completion
span.add_event("review_completed", attributes={
"overall_score": run.data.overall_score,
"files_reviewed": len(run.data.files)
})
return run.data
except Exception as e:
# Log error
span.record_exception(e)
span.set_status(logfire.StatusCode.ERROR, str(e))
raise
# =============================
# CLI
# =============================
def parse_args(argv: Optional[List[str]] = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Run code review agent using pydantic-ai")
parser.add_argument(
"paths",
nargs="*",
help="Files or directories to review",
)
parser.add_argument(
"--focus",
nargs="*",
default=[],
help="Optional focus areas, e.g. security performance readability accessibility",
)
parser.add_argument(
"--model",
default=None,
help="Model id, e.g. openai:gpt-4o or openai:gpt-4o-mini (defaults to env CODE_REVIEW_MODEL or gpt-4o-mini)",
)
parser.add_argument(
"--out",
default=None,
help="If provided, write a Markdown report to this file",
)
return parser.parse_args(argv)
def main(argv: Optional[List[str]] = None) -> int:
# Start tracing span for CLI execution
with logfire_client.span("cli_execution", attributes={
"argv": argv or []
}) as span:
try:
args = parse_args(argv)
if not args.paths:
span.add_event("no_paths_provided")
print("No input paths provided. Nothing to review.")
return 2
# Log CLI arguments
span.add_event("cli_args_parsed", attributes={
"paths_count": len(args.paths),
"focus_areas": args.focus,
"model": args.model,
"output_file": args.out
})
result = asyncio.run(review_paths(args.paths, focus_areas=args.focus, model=args.model))
md = render_markdown(result)
if args.out:
Path(args.out).write_text(md, encoding="utf-8")
span.add_event("report_saved", attributes={"output_file": args.out})
print(f"Saved review report to {args.out}")
else:
span.add_event("report_printed_to_console")
print(md)
span.add_event("cli_completed_successfully")
return 0
except Exception as e:
# Log error
span.record_exception(e)
span.set_status(logfire.StatusCode.ERROR, str(e))
raise
if __name__ == "__main__":
# Start tracing span for test execution
# with logfire_client.span("test_execution", attributes={
# "test_type": "diff_review"
# }) as span:
# try:
# path = "DIFF.md"
# span.add_event("reading_diff_file", attributes={"file_path": path})
# data = read_text_file(path)
# span.add_event("diff_file_read", attributes={"content_length": len(data)})
# span.add_event("starting_agent_run")
# res = code_review_agent.run_sync("", deps=data)
# span.add_event("agent_run_completed", attributes={
# "output_length": len(res.output) if res.output else 0
# })
# print(res.output)
# span.add_event("test_completed_successfully")
# except Exception as e:
# # Log error
# span.record_exception(e)
# span.set_status(logfire.StatusCode.ERROR, str(e))
# raise
diff = read_text_file("DIFF.md")
data = code_review_agent.run_sync("", deps = DiffDeps(diff = diff))
print(data)