from __future__ import annotations import argparse import asyncio import json import os from pathlib import Path from typing import List, Optional, Dict, Iterable, Tuple from pydantic import BaseModel, Field from pydantic_ai import Agent, RunContext from dotenv import load_dotenv import logfire load_dotenv() # ============================= # Logfire configuration for tracing # ============================= logfire.configure(token = os.getenv('LOGFIRE_API_KEY')) logfire.instrument_pydantic_ai() # ============================= # Data models for structured output # ============================= class Issue(BaseModel): title: str = Field(..., description="Short, actionable issue title") description: str = Field(..., description="Clear explanation, why it matters, and how to fix") severity: str = Field(..., description="One of: low, medium, high, critical") line: Optional[int] = Field(None, description="Line number if known/applicable") rule: Optional[str] = Field(None, description="Optional rule or best practice identifier") class FileReview(BaseModel): file_path: str summary: str score: int = Field(..., ge=0, le=10, description="10 = excellent, 0 = very poor") issues: List[Issue] = Field(default_factory=list) suggestions: List[str] = Field(default_factory=list) class CodeReviewResponse(BaseModel): overall_summary: str overall_score: int = Field(..., ge=0, le=10) files: List[FileReview] quick_actions: List[str] = Field(default_factory=list, description="Concise TODOs that can be applied immediately") class DiffDeps(BaseModel): diff: str # ============================= # Agent definition (Pydantic AI) # ============================= DEFAULT_MODEL = "google-gla:gemini-2.5-pro" code_review_agent = Agent( model = DEFAULT_MODEL, deps_type = DiffDeps, output_type = str, ) @code_review_agent.system_prompt def systemt_prompt(ctx: RunContext) -> str: return f""" You are a code review agent focused on analyzing pull request changes and generating concise summary change logs. ## Input Processing First, carefully examine the provided diff: {ctx.deps.diff} **Important**: Before proceeding with the review, check if the diff contains any actual changes: - If the diff is empty, contains only whitespace, or shows no meaningful modifications - If the diff indicates "no changes" or similar status - If all changes are just formatting/whitespace without functional impact ## Your Role - Review code changes in pull requests - Generate clear, actionable summary change logs - Focus solely on what changed and its impact - Provide appropriate responses when no changes are detected ## What to Review - **Code modifications**: Added, deleted, or modified lines - **Functional changes**: New features, bug fixes, refactoring - **Structural changes**: File additions/deletions, directory reorganization - **Dependency updates**: Package changes, version bumps - **Configuration changes**: Environment, build, or deployment configurations ## Response Logic ### If NO changes are detected: Simply respond with: ``` No changes detected in this pull request. ``` ### If changes ARE detected, include in summary: 1. **High-level overview**: Brief description of the PR's purpose 2. **Key changes**: List of main modifications made 3. **Files affected**: Count and types of files changed 4. **Impact assessment**: Brief note on potential effects 5. **Breaking changes**: Highlight any breaking changes prominently ## Output Format (for PRs with changes) ``` ## Pull Request Summary **Purpose**: [Brief description of what this PR accomplishes] **Changes Made**: - [Change 1 with file reference and brief description] - [Change 2 with file reference and brief description] - [Change 3 with file reference and brief description] **Files Modified**: X files changed (+Y additions, -Z deletions) **Breaking Changes**: [If any, list them here, otherwise state "None"] **Impact**: [Brief assessment of the changes' significance and potential effects] ``` ## What NOT to Focus On - Code style preferences (unless specifically requested) - Performance optimizations (unless critical) - Architecture discussions - Non-functional requirements - Testing strategies (unless tests are part of the changes) ## Guidelines - **Always check for actual changes first** - don't generate summaries for empty diffs - Keep summaries concise but informative (aim for 3-5 bullet points maximum) - Use clear, non-technical language when possible - Highlight breaking changes prominently with clear warnings - Focus on the "what" and "impact" not the "how" or "why" - Maintain objectivity in descriptions - Be specific about file types and locations when relevant - If changes are minimal (e.g., only comments or whitespace), mention this explicitly ## Edge Cases - **Empty diff**: Respond with "No changes detected in this pull request." - **Only whitespace/formatting changes**: Mention this explicitly: "Only formatting/whitespace changes detected." - **Very large diffs**: Focus on the most significant changes and note if summary is abbreviated - **Binary files**: Note that binary files were changed but cannot be reviewed in detail """ def read_text_file(path: str) -> str: """Read a UTF-8 text file from disk and return its contents. Truncates very large files. Args: path: Absolute or relative path to a text file. Returns: File text content (possibly truncated to keep context size reasonable). """ file_path = Path(path) if not file_path.exists() or not file_path.is_file(): raise FileNotFoundError(f"File not found: {path}") try: text = file_path.read_text(encoding="utf-8", errors="ignore") except Exception as exc: # pragma: no cover - defensive raise RuntimeError(f"Failed to read file: {path}: {exc}") max_chars = 200_000 if len(text) > max_chars: head = text[: max_chars // 2] tail = text[-max_chars // 2 :] return f"{head}\n\n... [truncated] ...\n\n{tail}" return text def list_code_files( ctx: RunContext, root: str, include_extensions: List[str] | None = None, exclude_dirs: List[str] | None = None, max_files: int = 200, ) -> List[str]: """List code files under a directory. Args: root: Directory root to scan include_extensions: e.g. [".py", ".ts", ".js", ".tsx", ".java"]. If omitted, uses a sensible default set. exclude_dirs: Directory names to skip (e.g. ["node_modules", ".git", "dist", "build"]). max_files: Upper bound on number of results to avoid huge contexts. Returns: List of file paths (strings) relative to the provided root where possible. """ root_path = Path(root) if not root_path.exists(): raise FileNotFoundError(f"Root directory not found: {root}") default_exts = [ ".py", ".ts", ".tsx", ".js", ".jsx", ".java", ".kt", ".go", ".rs", ".rb", ".php", ".cs", ".cpp", ".cc", ".c", ".m", ".mm", ".sql", ".yml", ".yaml", ".toml", ".json", ".md", ] exts = [e.lower() for e in (include_extensions or default_exts)] excluded = set(exclude_dirs or {".git", "node_modules", "dist", "build", ".venv", "__pycache__"}) results: List[str] = [] for path in root_path.rglob("*"): if path.is_dir(): if path.name in excluded: # Skip excluded directories entirely # Use try/except to ignore permission errors try: # Prevent descending further continue finally: ... continue if path.suffix.lower() in exts: results.append(str(path)) if len(results) >= max_files: break return results # ============================= # Utilities # ============================= def gather_targets(paths: List[str]) -> Tuple[List[str], List[str]]: """Split inputs into files and directories; expand files list from directories. Returns (files, dirs) """ files: List[str] = [] dirs: List[str] = [] for p in paths: path = Path(p) if path.is_file(): files.append(str(path)) elif path.is_dir(): dirs.append(str(path)) else: # Ignore non-existent continue return files, dirs def build_user_prompt( files: List[str], dirs: List[str], focus_areas: List[str], max_inline_chars: int = 60_000, ) -> str: """Create a concise instruction for the agent, listing files and review goals. We do not inline large file contents; the agent can use tools to load them on demand. Small files may be inlined to reduce tool calls. """ focus_text = ", ".join(focus_areas) if focus_areas else "general quality" # Try to inline very small files to prime the context inline_blobs: List[str] = [] inlined_total = 0 for f in files: try: text = Path(f).read_text(encoding="utf-8", errors="ignore") except Exception: continue if len(text) <= 8_000 and (inlined_total + len(text)) <= max_inline_chars: inline_blobs.append(f"File: {f}\n\n{text}") inlined_total += len(text) file_list_section = "\n".join(f"- {p}" for p in files) dir_list_section = "\n".join(f"- {d}" for d in dirs) inline_section = ("\n\n" + "\n\n".join(inline_blobs)) if inline_blobs else "" return ( "Perform a comprehensive code review for the repository subset below.\n\n" f"Focus areas: {focus_text}.\n\n" "Files:\n" + file_list_section + "\n\n" + ("Directories (you may list and inspect files using the provided tools):\n" + dir_list_section + "\n\n" if dirs else "") + "Use the read_text_file and list_code_files tools to fetch any file content you need.\n" + inline_section ) def render_markdown(result: CodeReviewResponse) -> str: """Render a human-readable Markdown report from the structured output.""" lines: List[str] = [] lines.append("# Code Review Report") lines.append("") lines.append(f"Overall Score: {result.overall_score}/10") lines.append("") lines.append(result.overall_summary) lines.append("") for f in result.files: lines.append(f"## {f.file_path} — Score: {f.score}/10") lines.append("") if f.summary: lines.append(f.summary) lines.append("") if f.issues: lines.append("### Issues") for idx, issue in enumerate(f.issues, start=1): where = f" (line {issue.line})" if issue.line is not None else "" rule = f" — {issue.rule}" if issue.rule else "" lines.append(f"- [{issue.severity.upper()}]{where}{rule}: {issue.title}") lines.append(f" - {issue.description}") lines.append("") if f.suggestions: lines.append("### Suggestions") for s in f.suggestions: lines.append(f"- {s}") lines.append("") if result.quick_actions: lines.append("## Quick Actions") for qa in result.quick_actions: lines.append(f"- {qa}") lines.append("") return "\n".join(lines) # ============================= # Public API # ============================= async def review_paths( paths: List[str], focus_areas: Optional[List[str]] = None, model: Optional[str] = None, ) -> CodeReviewResponse: # Start tracing span with logfire_client.span("review_paths", attributes={ "paths_count": len(paths), "focus_areas": focus_areas or [], "model": model or "default" }) as span: try: files, dirs = gather_targets(paths) # Log file analysis span.add_event("files_analyzed", attributes={ "files_count": len(files), "directories_count": len(dirs) }) agent = code_review_agent if model is None else Agent( model=model, result_model=CodeReviewResponse, system_prompt=systemt_prompt, # Use the system prompt directly ) user_prompt = build_user_prompt(files, dirs, focus_areas or []) # Log prompt generation span.add_event("prompt_generated", attributes={ "prompt_length": len(user_prompt) }) run = await agent.run(user_prompt) # Log successful completion span.add_event("review_completed", attributes={ "overall_score": run.data.overall_score, "files_reviewed": len(run.data.files) }) return run.data except Exception as e: # Log error span.record_exception(e) span.set_status(logfire.StatusCode.ERROR, str(e)) raise async def review_code_string( code: str, filename: str = "snippet", focus_areas: Optional[List[str]] = None, model: Optional[str] = None, ) -> CodeReviewResponse: # Start tracing span with logfire_client.span("review_code_string", attributes={ "filename": filename, "code_length": len(code), "focus_areas": focus_areas or [], "model": model or "default" }) as span: try: agent = code_review_agent if model is None else Agent( model=model, result_model=CodeReviewResponse, system_prompt=systemt_prompt, # Use the system prompt directly ) prompt = ( f"Review the following code ({filename}).\n\n" # noqa: E501 f"Focus areas: {', '.join(focus_areas or []) or 'general quality'}.\n\n" f"{code}" ) # Log prompt generation span.add_event("prompt_generated", attributes={ "prompt_length": len(prompt) }) run = await agent.run(prompt) # Log successful completion span.add_event("review_completed", attributes={ "overall_score": run.data.overall_score, "files_reviewed": len(run.data.files) }) return run.data except Exception as e: # Log error span.record_exception(e) span.set_status(logfire.StatusCode.ERROR, str(e)) raise # ============================= # CLI # ============================= def parse_args(argv: Optional[List[str]] = None) -> argparse.Namespace: parser = argparse.ArgumentParser(description="Run code review agent using pydantic-ai") parser.add_argument( "paths", nargs="*", help="Files or directories to review", ) parser.add_argument( "--focus", nargs="*", default=[], help="Optional focus areas, e.g. security performance readability accessibility", ) parser.add_argument( "--model", default=None, help="Model id, e.g. openai:gpt-4o or openai:gpt-4o-mini (defaults to env CODE_REVIEW_MODEL or gpt-4o-mini)", ) parser.add_argument( "--out", default=None, help="If provided, write a Markdown report to this file", ) return parser.parse_args(argv) def main(argv: Optional[List[str]] = None) -> int: # Start tracing span for CLI execution with logfire_client.span("cli_execution", attributes={ "argv": argv or [] }) as span: try: args = parse_args(argv) if not args.paths: span.add_event("no_paths_provided") print("No input paths provided. Nothing to review.") return 2 # Log CLI arguments span.add_event("cli_args_parsed", attributes={ "paths_count": len(args.paths), "focus_areas": args.focus, "model": args.model, "output_file": args.out }) result = asyncio.run(review_paths(args.paths, focus_areas=args.focus, model=args.model)) md = render_markdown(result) if args.out: Path(args.out).write_text(md, encoding="utf-8") span.add_event("report_saved", attributes={"output_file": args.out}) print(f"Saved review report to {args.out}") else: span.add_event("report_printed_to_console") print(md) span.add_event("cli_completed_successfully") return 0 except Exception as e: # Log error span.record_exception(e) span.set_status(logfire.StatusCode.ERROR, str(e)) raise if __name__ == "__main__": # Start tracing span for test execution # with logfire_client.span("test_execution", attributes={ # "test_type": "diff_review" # }) as span: # try: # path = "DIFF.md" # span.add_event("reading_diff_file", attributes={"file_path": path}) # data = read_text_file(path) # span.add_event("diff_file_read", attributes={"content_length": len(data)}) # span.add_event("starting_agent_run") # res = code_review_agent.run_sync("", deps=data) # span.add_event("agent_run_completed", attributes={ # "output_length": len(res.output) if res.output else 0 # }) # print(res.output) # span.add_event("test_completed_successfully") # except Exception as e: # # Log error # span.record_exception(e) # span.set_status(logfire.StatusCode.ERROR, str(e)) # raise diff = read_text_file("DIFF.md") data = code_review_agent.run_sync("", deps = DiffDeps(diff = diff)) print(data)