Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import argparse | |
| import asyncio | |
| import json | |
| import os | |
| from pathlib import Path | |
| from typing import List, Optional, Dict, Iterable, Tuple | |
| from pydantic import BaseModel, Field | |
| from pydantic_ai import Agent, RunContext | |
| from dotenv import load_dotenv | |
| import logfire | |
| load_dotenv() | |
| # ============================= | |
| # Logfire configuration for tracing | |
| # ============================= | |
| logfire.configure(token = os.getenv('LOGFIRE_API_KEY')) | |
| logfire.instrument_pydantic_ai() | |
| # ============================= | |
| # Data models for structured output | |
| # ============================= | |
| class Issue(BaseModel): | |
| title: str = Field(..., description="Short, actionable issue title") | |
| description: str = Field(..., description="Clear explanation, why it matters, and how to fix") | |
| severity: str = Field(..., description="One of: low, medium, high, critical") | |
| line: Optional[int] = Field(None, description="Line number if known/applicable") | |
| rule: Optional[str] = Field(None, description="Optional rule or best practice identifier") | |
| class FileReview(BaseModel): | |
| file_path: str | |
| summary: str | |
| score: int = Field(..., ge=0, le=10, description="10 = excellent, 0 = very poor") | |
| issues: List[Issue] = Field(default_factory=list) | |
| suggestions: List[str] = Field(default_factory=list) | |
| class CodeReviewResponse(BaseModel): | |
| overall_summary: str | |
| overall_score: int = Field(..., ge=0, le=10) | |
| files: List[FileReview] | |
| quick_actions: List[str] = Field(default_factory=list, description="Concise TODOs that can be applied immediately") | |
| class DiffDeps(BaseModel): | |
| diff: str | |
| # ============================= | |
| # Agent definition (Pydantic AI) | |
| # ============================= | |
| DEFAULT_MODEL = "google-gla:gemini-2.5-pro" | |
| code_review_agent = Agent( | |
| model = DEFAULT_MODEL, | |
| deps_type = DiffDeps, | |
| output_type = str, | |
| ) | |
| def systemt_prompt(ctx: RunContext) -> str: | |
| return f""" | |
| You are a code review agent focused on analyzing pull request changes and generating concise summary change logs. | |
| ## Input Processing | |
| First, carefully examine the provided diff: | |
| {ctx.deps.diff} | |
| **Important**: Before proceeding with the review, check if the diff contains any actual changes: | |
| - If the diff is empty, contains only whitespace, or shows no meaningful modifications | |
| - If the diff indicates "no changes" or similar status | |
| - If all changes are just formatting/whitespace without functional impact | |
| ## Your Role | |
| - Review code changes in pull requests | |
| - Generate clear, actionable summary change logs | |
| - Focus solely on what changed and its impact | |
| - Provide appropriate responses when no changes are detected | |
| ## What to Review | |
| - **Code modifications**: Added, deleted, or modified lines | |
| - **Functional changes**: New features, bug fixes, refactoring | |
| - **Structural changes**: File additions/deletions, directory reorganization | |
| - **Dependency updates**: Package changes, version bumps | |
| - **Configuration changes**: Environment, build, or deployment configurations | |
| ## Response Logic | |
| ### If NO changes are detected: | |
| Simply respond with: | |
| ``` | |
| No changes detected in this pull request. | |
| ``` | |
| ### If changes ARE detected, include in summary: | |
| 1. **High-level overview**: Brief description of the PR's purpose | |
| 2. **Key changes**: List of main modifications made | |
| 3. **Files affected**: Count and types of files changed | |
| 4. **Impact assessment**: Brief note on potential effects | |
| 5. **Breaking changes**: Highlight any breaking changes prominently | |
| ## Output Format (for PRs with changes) | |
| ``` | |
| ## Pull Request Summary | |
| **Purpose**: [Brief description of what this PR accomplishes] | |
| **Changes Made**: | |
| - [Change 1 with file reference and brief description] | |
| - [Change 2 with file reference and brief description] | |
| - [Change 3 with file reference and brief description] | |
| **Files Modified**: X files changed (+Y additions, -Z deletions) | |
| **Breaking Changes**: [If any, list them here, otherwise state "None"] | |
| **Impact**: [Brief assessment of the changes' significance and potential effects] | |
| ``` | |
| ## What NOT to Focus On | |
| - Code style preferences (unless specifically requested) | |
| - Performance optimizations (unless critical) | |
| - Architecture discussions | |
| - Non-functional requirements | |
| - Testing strategies (unless tests are part of the changes) | |
| ## Guidelines | |
| - **Always check for actual changes first** - don't generate summaries for empty diffs | |
| - Keep summaries concise but informative (aim for 3-5 bullet points maximum) | |
| - Use clear, non-technical language when possible | |
| - Highlight breaking changes prominently with clear warnings | |
| - Focus on the "what" and "impact" not the "how" or "why" | |
| - Maintain objectivity in descriptions | |
| - Be specific about file types and locations when relevant | |
| - If changes are minimal (e.g., only comments or whitespace), mention this explicitly | |
| ## Edge Cases | |
| - **Empty diff**: Respond with "No changes detected in this pull request." | |
| - **Only whitespace/formatting changes**: Mention this explicitly: "Only formatting/whitespace changes detected." | |
| - **Very large diffs**: Focus on the most significant changes and note if summary is abbreviated | |
| - **Binary files**: Note that binary files were changed but cannot be reviewed in detail | |
| """ | |
| def read_text_file(path: str) -> str: | |
| """Read a UTF-8 text file from disk and return its contents. Truncates very large files. | |
| Args: | |
| path: Absolute or relative path to a text file. | |
| Returns: | |
| File text content (possibly truncated to keep context size reasonable). | |
| """ | |
| file_path = Path(path) | |
| if not file_path.exists() or not file_path.is_file(): | |
| raise FileNotFoundError(f"File not found: {path}") | |
| try: | |
| text = file_path.read_text(encoding="utf-8", errors="ignore") | |
| except Exception as exc: # pragma: no cover - defensive | |
| raise RuntimeError(f"Failed to read file: {path}: {exc}") | |
| max_chars = 200_000 | |
| if len(text) > max_chars: | |
| head = text[: max_chars // 2] | |
| tail = text[-max_chars // 2 :] | |
| return f"{head}\n\n... [truncated] ...\n\n{tail}" | |
| return text | |
| def list_code_files( | |
| ctx: RunContext, | |
| root: str, | |
| include_extensions: List[str] | None = None, | |
| exclude_dirs: List[str] | None = None, | |
| max_files: int = 200, | |
| ) -> List[str]: | |
| """List code files under a directory. | |
| Args: | |
| root: Directory root to scan | |
| include_extensions: e.g. [".py", ".ts", ".js", ".tsx", ".java"]. If omitted, uses a sensible default set. | |
| exclude_dirs: Directory names to skip (e.g. ["node_modules", ".git", "dist", "build"]). | |
| max_files: Upper bound on number of results to avoid huge contexts. | |
| Returns: | |
| List of file paths (strings) relative to the provided root where possible. | |
| """ | |
| root_path = Path(root) | |
| if not root_path.exists(): | |
| raise FileNotFoundError(f"Root directory not found: {root}") | |
| default_exts = [ | |
| ".py", | |
| ".ts", | |
| ".tsx", | |
| ".js", | |
| ".jsx", | |
| ".java", | |
| ".kt", | |
| ".go", | |
| ".rs", | |
| ".rb", | |
| ".php", | |
| ".cs", | |
| ".cpp", | |
| ".cc", | |
| ".c", | |
| ".m", | |
| ".mm", | |
| ".sql", | |
| ".yml", | |
| ".yaml", | |
| ".toml", | |
| ".json", | |
| ".md", | |
| ] | |
| exts = [e.lower() for e in (include_extensions or default_exts)] | |
| excluded = set(exclude_dirs or {".git", "node_modules", "dist", "build", ".venv", "__pycache__"}) | |
| results: List[str] = [] | |
| for path in root_path.rglob("*"): | |
| if path.is_dir(): | |
| if path.name in excluded: | |
| # Skip excluded directories entirely | |
| # Use try/except to ignore permission errors | |
| try: | |
| # Prevent descending further | |
| continue | |
| finally: | |
| ... | |
| continue | |
| if path.suffix.lower() in exts: | |
| results.append(str(path)) | |
| if len(results) >= max_files: | |
| break | |
| return results | |
| # ============================= | |
| # Utilities | |
| # ============================= | |
| def gather_targets(paths: List[str]) -> Tuple[List[str], List[str]]: | |
| """Split inputs into files and directories; expand files list from directories. | |
| Returns (files, dirs) | |
| """ | |
| files: List[str] = [] | |
| dirs: List[str] = [] | |
| for p in paths: | |
| path = Path(p) | |
| if path.is_file(): | |
| files.append(str(path)) | |
| elif path.is_dir(): | |
| dirs.append(str(path)) | |
| else: | |
| # Ignore non-existent | |
| continue | |
| return files, dirs | |
| def build_user_prompt( | |
| files: List[str], | |
| dirs: List[str], | |
| focus_areas: List[str], | |
| max_inline_chars: int = 60_000, | |
| ) -> str: | |
| """Create a concise instruction for the agent, listing files and review goals. | |
| We do not inline large file contents; the agent can use tools to load them on demand. | |
| Small files may be inlined to reduce tool calls. | |
| """ | |
| focus_text = ", ".join(focus_areas) if focus_areas else "general quality" | |
| # Try to inline very small files to prime the context | |
| inline_blobs: List[str] = [] | |
| inlined_total = 0 | |
| for f in files: | |
| try: | |
| text = Path(f).read_text(encoding="utf-8", errors="ignore") | |
| except Exception: | |
| continue | |
| if len(text) <= 8_000 and (inlined_total + len(text)) <= max_inline_chars: | |
| inline_blobs.append(f"File: {f}\n\n{text}") | |
| inlined_total += len(text) | |
| file_list_section = "\n".join(f"- {p}" for p in files) | |
| dir_list_section = "\n".join(f"- {d}" for d in dirs) | |
| inline_section = ("\n\n" + "\n\n".join(inline_blobs)) if inline_blobs else "" | |
| return ( | |
| "Perform a comprehensive code review for the repository subset below.\n\n" | |
| f"Focus areas: {focus_text}.\n\n" | |
| "Files:\n" + file_list_section + "\n\n" | |
| + ("Directories (you may list and inspect files using the provided tools):\n" + dir_list_section + "\n\n" if dirs else "") | |
| + "Use the read_text_file and list_code_files tools to fetch any file content you need.\n" | |
| + inline_section | |
| ) | |
| def render_markdown(result: CodeReviewResponse) -> str: | |
| """Render a human-readable Markdown report from the structured output.""" | |
| lines: List[str] = [] | |
| lines.append("# Code Review Report") | |
| lines.append("") | |
| lines.append(f"Overall Score: {result.overall_score}/10") | |
| lines.append("") | |
| lines.append(result.overall_summary) | |
| lines.append("") | |
| for f in result.files: | |
| lines.append(f"## {f.file_path} — Score: {f.score}/10") | |
| lines.append("") | |
| if f.summary: | |
| lines.append(f.summary) | |
| lines.append("") | |
| if f.issues: | |
| lines.append("### Issues") | |
| for idx, issue in enumerate(f.issues, start=1): | |
| where = f" (line {issue.line})" if issue.line is not None else "" | |
| rule = f" — {issue.rule}" if issue.rule else "" | |
| lines.append(f"- [{issue.severity.upper()}]{where}{rule}: {issue.title}") | |
| lines.append(f" - {issue.description}") | |
| lines.append("") | |
| if f.suggestions: | |
| lines.append("### Suggestions") | |
| for s in f.suggestions: | |
| lines.append(f"- {s}") | |
| lines.append("") | |
| if result.quick_actions: | |
| lines.append("## Quick Actions") | |
| for qa in result.quick_actions: | |
| lines.append(f"- {qa}") | |
| lines.append("") | |
| return "\n".join(lines) | |
| # ============================= | |
| # Public API | |
| # ============================= | |
| async def review_paths( | |
| paths: List[str], | |
| focus_areas: Optional[List[str]] = None, | |
| model: Optional[str] = None, | |
| ) -> CodeReviewResponse: | |
| # Start tracing span | |
| with logfire_client.span("review_paths", attributes={ | |
| "paths_count": len(paths), | |
| "focus_areas": focus_areas or [], | |
| "model": model or "default" | |
| }) as span: | |
| try: | |
| files, dirs = gather_targets(paths) | |
| # Log file analysis | |
| span.add_event("files_analyzed", attributes={ | |
| "files_count": len(files), | |
| "directories_count": len(dirs) | |
| }) | |
| agent = code_review_agent if model is None else Agent( | |
| model=model, | |
| result_model=CodeReviewResponse, | |
| system_prompt=systemt_prompt, # Use the system prompt directly | |
| ) | |
| user_prompt = build_user_prompt(files, dirs, focus_areas or []) | |
| # Log prompt generation | |
| span.add_event("prompt_generated", attributes={ | |
| "prompt_length": len(user_prompt) | |
| }) | |
| run = await agent.run(user_prompt) | |
| # Log successful completion | |
| span.add_event("review_completed", attributes={ | |
| "overall_score": run.data.overall_score, | |
| "files_reviewed": len(run.data.files) | |
| }) | |
| return run.data | |
| except Exception as e: | |
| # Log error | |
| span.record_exception(e) | |
| span.set_status(logfire.StatusCode.ERROR, str(e)) | |
| raise | |
| async def review_code_string( | |
| code: str, | |
| filename: str = "snippet", | |
| focus_areas: Optional[List[str]] = None, | |
| model: Optional[str] = None, | |
| ) -> CodeReviewResponse: | |
| # Start tracing span | |
| with logfire_client.span("review_code_string", attributes={ | |
| "filename": filename, | |
| "code_length": len(code), | |
| "focus_areas": focus_areas or [], | |
| "model": model or "default" | |
| }) as span: | |
| try: | |
| agent = code_review_agent if model is None else Agent( | |
| model=model, | |
| result_model=CodeReviewResponse, | |
| system_prompt=systemt_prompt, # Use the system prompt directly | |
| ) | |
| prompt = ( | |
| f"Review the following code ({filename}).\n\n" # noqa: E501 | |
| f"Focus areas: {', '.join(focus_areas or []) or 'general quality'}.\n\n" | |
| f"{code}" | |
| ) | |
| # Log prompt generation | |
| span.add_event("prompt_generated", attributes={ | |
| "prompt_length": len(prompt) | |
| }) | |
| run = await agent.run(prompt) | |
| # Log successful completion | |
| span.add_event("review_completed", attributes={ | |
| "overall_score": run.data.overall_score, | |
| "files_reviewed": len(run.data.files) | |
| }) | |
| return run.data | |
| except Exception as e: | |
| # Log error | |
| span.record_exception(e) | |
| span.set_status(logfire.StatusCode.ERROR, str(e)) | |
| raise | |
| # ============================= | |
| # CLI | |
| # ============================= | |
| def parse_args(argv: Optional[List[str]] = None) -> argparse.Namespace: | |
| parser = argparse.ArgumentParser(description="Run code review agent using pydantic-ai") | |
| parser.add_argument( | |
| "paths", | |
| nargs="*", | |
| help="Files or directories to review", | |
| ) | |
| parser.add_argument( | |
| "--focus", | |
| nargs="*", | |
| default=[], | |
| help="Optional focus areas, e.g. security performance readability accessibility", | |
| ) | |
| parser.add_argument( | |
| "--model", | |
| default=None, | |
| help="Model id, e.g. openai:gpt-4o or openai:gpt-4o-mini (defaults to env CODE_REVIEW_MODEL or gpt-4o-mini)", | |
| ) | |
| parser.add_argument( | |
| "--out", | |
| default=None, | |
| help="If provided, write a Markdown report to this file", | |
| ) | |
| return parser.parse_args(argv) | |
| def main(argv: Optional[List[str]] = None) -> int: | |
| # Start tracing span for CLI execution | |
| with logfire_client.span("cli_execution", attributes={ | |
| "argv": argv or [] | |
| }) as span: | |
| try: | |
| args = parse_args(argv) | |
| if not args.paths: | |
| span.add_event("no_paths_provided") | |
| print("No input paths provided. Nothing to review.") | |
| return 2 | |
| # Log CLI arguments | |
| span.add_event("cli_args_parsed", attributes={ | |
| "paths_count": len(args.paths), | |
| "focus_areas": args.focus, | |
| "model": args.model, | |
| "output_file": args.out | |
| }) | |
| result = asyncio.run(review_paths(args.paths, focus_areas=args.focus, model=args.model)) | |
| md = render_markdown(result) | |
| if args.out: | |
| Path(args.out).write_text(md, encoding="utf-8") | |
| span.add_event("report_saved", attributes={"output_file": args.out}) | |
| print(f"Saved review report to {args.out}") | |
| else: | |
| span.add_event("report_printed_to_console") | |
| print(md) | |
| span.add_event("cli_completed_successfully") | |
| return 0 | |
| except Exception as e: | |
| # Log error | |
| span.record_exception(e) | |
| span.set_status(logfire.StatusCode.ERROR, str(e)) | |
| raise | |
| if __name__ == "__main__": | |
| # Start tracing span for test execution | |
| # with logfire_client.span("test_execution", attributes={ | |
| # "test_type": "diff_review" | |
| # }) as span: | |
| # try: | |
| # path = "DIFF.md" | |
| # span.add_event("reading_diff_file", attributes={"file_path": path}) | |
| # data = read_text_file(path) | |
| # span.add_event("diff_file_read", attributes={"content_length": len(data)}) | |
| # span.add_event("starting_agent_run") | |
| # res = code_review_agent.run_sync("", deps=data) | |
| # span.add_event("agent_run_completed", attributes={ | |
| # "output_length": len(res.output) if res.output else 0 | |
| # }) | |
| # print(res.output) | |
| # span.add_event("test_completed_successfully") | |
| # except Exception as e: | |
| # # Log error | |
| # span.record_exception(e) | |
| # span.set_status(logfire.StatusCode.ERROR, str(e)) | |
| # raise | |
| diff = read_text_file("DIFF.md") | |
| data = code_review_agent.run_sync("", deps = DiffDeps(diff = diff)) | |
| print(data) | |