| |
| """ |
| PR Tracker - Fetches PRs that need review using GitHub API. |
| |
| This module provides data about open PRs. Claude handles orchestration, |
| review logic, and posting reviews. |
| |
| Usage: |
| # Get PRs updated in the last 6 hours |
| python3 scripts/pr_tracker.py --list --since 6h |
| |
| # Get PRs updated since a specific time |
| python3 scripts/pr_tracker.py --list --since 2024-01-13T00:00:00Z |
| |
| # Get details for a specific PR |
| python3 scripts/pr_tracker.py --details 123 |
| """ |
|
|
| import json |
| import os |
| import re |
| from datetime import datetime, timedelta, timezone |
| from pathlib import Path |
| from typing import Optional |
|
|
| from github import Github, Auth |
|
|
| |
| DEFAULT_REPO = "meta-pytorch/OpenEnv" |
| DEFAULT_STATE_FILE = Path.home() / ".openenv-review-state.json" |
|
|
|
|
| def _get_github_client() -> Github: |
| """Get authenticated GitHub client.""" |
| token = os.environ.get("GITHUB_TOKEN") |
| if token: |
| return Github(auth=Auth.Token(token)) |
|
|
| |
| try: |
| import subprocess |
|
|
| result = subprocess.run( |
| ["gh", "auth", "token"], |
| capture_output=True, |
| text=True, |
| check=True, |
| ) |
| token = result.stdout.strip() |
| return Github(auth=Auth.Token(token)) |
| except (subprocess.CalledProcessError, FileNotFoundError): |
| pass |
|
|
| raise RuntimeError( |
| "No GitHub token found. Set GITHUB_TOKEN env var or authenticate with 'gh auth login'" |
| ) |
|
|
|
|
| def parse_since(since_str: str) -> datetime: |
| """ |
| Parse a 'since' argument into a datetime. |
| |
| Accepts: |
| - Duration: "6h", "1d", "30m", "2w" |
| - ISO timestamp: "2024-01-13T00:00:00Z" |
| """ |
| |
| duration_match = re.match(r"^(\d+)([mhdw])$", since_str.lower()) |
| if duration_match: |
| value = int(duration_match.group(1)) |
| unit = duration_match.group(2) |
| unit_map = { |
| "m": timedelta(minutes=value), |
| "h": timedelta(hours=value), |
| "d": timedelta(days=value), |
| "w": timedelta(weeks=value), |
| } |
| return datetime.now(timezone.utc) - unit_map[unit] |
|
|
| |
| try: |
| |
| if since_str.endswith("Z"): |
| since_str = since_str[:-1] + "+00:00" |
| return datetime.fromisoformat(since_str) |
| except ValueError: |
| pass |
|
|
| raise ValueError( |
| f"Invalid 'since' format: {since_str}. " |
| "Use duration (6h, 1d, 30m, 2w) or ISO timestamp (2024-01-13T00:00:00Z)" |
| ) |
|
|
|
|
| def get_prs_needing_review( |
| repo: str = DEFAULT_REPO, |
| since: Optional[datetime] = None, |
| state_file: Optional[Path] = None, |
| ) -> list[dict]: |
| """ |
| Get list of PRs that need review. |
| |
| Args: |
| repo: Repository name (owner/repo) |
| since: Only return PRs updated after this time |
| state_file: Optional state file for SHA-based tracking (legacy) |
| |
| Returns: |
| List of dicts with PR info: |
| - number: PR number |
| - title: PR title |
| - author: Author username |
| - url: PR URL |
| - head_sha: Current commit SHA |
| - updated_at: Last update time (ISO format) |
| """ |
| gh = _get_github_client() |
| repo_obj = gh.get_repo(repo) |
|
|
| |
| repo_state = {} |
| if state_file and state_file.exists(): |
| try: |
| state = json.loads(state_file.read_text()) |
| repo_state = state.get(repo, {}) |
| except json.JSONDecodeError: |
| pass |
|
|
| prs = [] |
| for pr in repo_obj.get_pulls(state="open"): |
| if pr.draft: |
| continue |
|
|
| |
| if since and pr.updated_at < since: |
| continue |
|
|
| |
| if state_file: |
| pr_state = repo_state.get(str(pr.number), {}) |
| if pr_state.get("last_reviewed_sha") == pr.head.sha: |
| continue |
|
|
| prs.append( |
| { |
| "number": pr.number, |
| "title": pr.title, |
| "author": pr.user.login, |
| "url": pr.html_url, |
| "head_sha": pr.head.sha, |
| "updated_at": pr.updated_at.isoformat(), |
| } |
| ) |
|
|
| return prs |
|
|
|
|
| def get_pr_details(pr_number: int, repo: str = DEFAULT_REPO) -> dict: |
| """Get detailed information about a specific PR.""" |
| gh = _get_github_client() |
| repo_obj = gh.get_repo(repo) |
| pr = repo_obj.get_pull(pr_number) |
|
|
| return { |
| "number": pr.number, |
| "title": pr.title, |
| "author": pr.user.login, |
| "url": pr.html_url, |
| "head_sha": pr.head.sha, |
| "updated_at": pr.updated_at.isoformat(), |
| "body": pr.body or "", |
| "files": [f.filename for f in pr.get_files()], |
| "diff_url": pr.diff_url, |
| "additions": pr.additions, |
| "deletions": pr.deletions, |
| "changed_files": pr.changed_files, |
| } |
|
|
|
|
| def record_review( |
| pr_number: int, |
| commit_sha: str, |
| verdict: str, |
| repo: str = DEFAULT_REPO, |
| state_file: Path = DEFAULT_STATE_FILE, |
| ): |
| """Record that a PR was reviewed (for SHA-based tracking).""" |
| state = {} |
| if state_file.exists(): |
| try: |
| state = json.loads(state_file.read_text()) |
| except json.JSONDecodeError: |
| pass |
|
|
| if repo not in state: |
| state[repo] = {} |
|
|
| state[repo][str(pr_number)] = { |
| "last_reviewed_sha": commit_sha, |
| "review_timestamp": datetime.now(timezone.utc).isoformat(), |
| "verdict": verdict, |
| } |
|
|
| state_file.parent.mkdir(parents=True, exist_ok=True) |
| state_file.write_text(json.dumps(state, indent=2)) |
| state_file.chmod(0o600) |
|
|
|
|
| def post_review( |
| pr_number: int, |
| verdict: str, |
| body: str, |
| repo: str = DEFAULT_REPO, |
| ): |
| """ |
| Post a review to a PR. |
| |
| Args: |
| pr_number: PR number |
| verdict: One of "approve", "comment", "request_changes" |
| body: Review body (markdown) |
| """ |
| gh = _get_github_client() |
| repo_obj = gh.get_repo(repo) |
| pr = repo_obj.get_pull(pr_number) |
|
|
| event_map = { |
| "approve": "APPROVE", |
| "comment": "COMMENT", |
| "request_changes": "REQUEST_CHANGES", |
| } |
| event = event_map.get(verdict, "COMMENT") |
|
|
| |
| current_user = gh.get_user().login |
| if event == "APPROVE" and pr.user.login == current_user: |
| pr.create_issue_comment(body) |
| return |
|
|
| formatted_body = f"""> **Note**: This is an automated review by **Claude Code**, not a human review. |
| |
| --- |
| |
| {body} |
| |
| --- |
| *Automated review by Claude Code | [Learn more](https://github.com/meta-pytorch/OpenEnv/blob/main/CLAUDE.md)*""" |
|
|
| pr.create_review(body=formatted_body, event=event) |
|
|
|
|
| if __name__ == "__main__": |
| import argparse |
|
|
| parser = argparse.ArgumentParser( |
| description="PR Tracker - Fetch PRs needing review", |
| formatter_class=argparse.RawDescriptionHelpFormatter, |
| epilog=""" |
| Examples: |
| # PRs updated in last 6 hours |
| python3 pr_tracker.py --list --since 6h |
| |
| # PRs updated in last day |
| python3 pr_tracker.py --list --since 1d |
| |
| # PRs updated since specific time |
| python3 pr_tracker.py --list --since 2024-01-13T00:00:00Z |
| |
| # Get details for PR #123 |
| python3 pr_tracker.py --details 123 |
| """, |
| ) |
| parser.add_argument("--list", action="store_true", help="List PRs needing review") |
| parser.add_argument( |
| "--details", type=int, metavar="PR", help="Get details for specific PR" |
| ) |
| parser.add_argument( |
| "--since", |
| type=str, |
| help="Only PRs updated since (e.g., 6h, 1d, 2024-01-13T00:00:00Z)", |
| ) |
| parser.add_argument( |
| "--repo", default=DEFAULT_REPO, help="Repository (default: %(default)s)" |
| ) |
| parser.add_argument( |
| "--use-state", action="store_true", help="Also filter by SHA state file" |
| ) |
| args = parser.parse_args() |
|
|
| if args.list: |
| since = parse_since(args.since) if args.since else None |
| state_file = DEFAULT_STATE_FILE if args.use_state else None |
| prs = get_prs_needing_review(repo=args.repo, since=since, state_file=state_file) |
| print(json.dumps(prs, indent=2)) |
| elif args.details: |
| details = get_pr_details(args.details, repo=args.repo) |
| print(json.dumps(details, indent=2)) |
| else: |
| parser.print_help() |
|
|