from __future__ import annotations import argparse import json from collections.abc import Iterable from dataclasses import dataclass from pathlib import Path from typing import Any, Protocol from slop_farmer.config import resolve_github_token from slop_farmer.data.github_api import GitHubClient from slop_farmer.data.normalize import ( normalize_comment, normalize_pr_diff, normalize_pr_file, normalize_review, normalize_review_comment, ) from slop_farmer.data.parquet_io import read_json, read_parquet_rows, write_json, write_text PROMPT_TEMPLATE_PATH = Path(__file__).resolve().parent / "prompts" / "canonical_duplicate_pr.md" SCHEMA_PATH = ( Path(__file__).resolve().parent / "schemas" / "canonical_duplicate_pr_output.schema.json" ) # Navigation: # - snapshot loading + ranked cluster selection # - stage_run_bundle(): prepare prompt/context artifacts for synthesis # - prepare_publish_artifacts(): turn a successful Codex result into PR-ready output # - context builders + GitHub fallback helpers # - tiny CLI at the bottom for manual staging / publish prep @dataclass(slots=True) class SnapshotBundle: report_path: Path snapshot_dir: Path repo: str snapshot_id: str extracted_at: str report: dict[str, Any] issues: list[dict[str, Any]] pull_requests: list[dict[str, Any]] comments: list[dict[str, Any]] reviews: list[dict[str, Any]] review_comments: list[dict[str, Any]] pr_files: list[dict[str, Any]] pr_diffs: list[dict[str, Any]] class GitHubClientLike(Protocol): def iter_pull_files(self, owner: str, repo: str, number: int) -> Iterable[dict[str, Any]]: ... def get_pull_request_diff(self, owner: str, repo: str, number: int) -> str: ... def iter_issue_comments_for_number( self, owner: str, repo: str, number: int, since: str | None ) -> Iterable[dict[str, Any]]: ... def iter_pull_reviews(self, owner: str, repo: str, number: int) -> Iterable[dict[str, Any]]: ... def iter_pull_review_comments( self, owner: str, repo: str, number: int ) -> Iterable[dict[str, Any]]: ... # Snapshot loading and cluster selection def load_snapshot_bundle(report_path: Path) -> SnapshotBundle: resolved_report = report_path.resolve() snapshot_dir = resolved_report.parent manifest_path = snapshot_dir / "manifest.json" manifest = read_json(manifest_path) if manifest_path.exists() else {} report = read_json(resolved_report) repo = str(report.get("repo") or manifest.get("repo") or "") snapshot_id = str(report.get("snapshot_id") or manifest.get("snapshot_id") or snapshot_dir.name) extracted_at = str(manifest.get("extracted_at") or report.get("generated_at") or "") return SnapshotBundle( report_path=resolved_report, snapshot_dir=snapshot_dir, repo=repo, snapshot_id=snapshot_id, extracted_at=extracted_at, report=report, issues=read_parquet_rows(snapshot_dir / "issues.parquet"), pull_requests=read_parquet_rows(snapshot_dir / "pull_requests.parquet"), comments=read_parquet_rows(snapshot_dir / "comments.parquet"), reviews=read_parquet_rows(snapshot_dir / "reviews.parquet"), review_comments=read_parquet_rows(snapshot_dir / "review_comments.parquet"), pr_files=read_parquet_rows(snapshot_dir / "pr_files.parquet"), pr_diffs=read_parquet_rows(snapshot_dir / "pr_diffs.parquet"), ) def select_ranked_duplicate_pr_clusters( bundle: SnapshotBundle, *, limit: int | None = None, ) -> list[dict[str, Any]]: duplicate_prs = { str(row.get("cluster_id")): row for row in bundle.report.get("duplicate_prs", []) if row.get("cluster_id") } pull_request_map = { int(row["number"]): row for row in bundle.pull_requests if row.get("number") is not None } candidates: list[dict[str, Any]] = [] for rank_index, meta_bug in enumerate(bundle.report.get("meta_bugs", []), start=1): cluster_id = str(meta_bug.get("cluster_id") or "") duplicate_cluster = duplicate_prs.get(cluster_id) if not duplicate_cluster: continue all_pr_numbers = _ordered_ints(meta_bug.get("pr_numbers")) open_source_pr_numbers = [ number for number in all_pr_numbers if _is_open_non_draft_pull_request(pull_request_map.get(number)) ] if len(open_source_pr_numbers) < 2: continue candidate = { "cluster_id": cluster_id, "rank_index": rank_index, "canonical_pr_number": _coerce_int(meta_bug.get("canonical_pr_number")), "canonical_issue_number": _coerce_int(meta_bug.get("canonical_issue_number")), "target_issue_number": _coerce_int(duplicate_cluster.get("target_issue_number")), "all_pr_numbers": all_pr_numbers, "duplicate_pr_numbers": _ordered_ints(duplicate_cluster.get("duplicate_pr_numbers")), "source_pr_numbers": open_source_pr_numbers, "issue_numbers": _ordered_ints(meta_bug.get("issue_numbers")), "summary": meta_bug.get("summary"), "status": meta_bug.get("status"), "confidence": meta_bug.get("confidence"), "evidence_types": list(meta_bug.get("evidence_types") or []), "reason": duplicate_cluster.get("reason"), } candidates.append(candidate) if limit is not None and len(candidates) >= limit: break return candidates def select_ranked_duplicate_pr_cluster( bundle: SnapshotBundle, *, cluster_id: str | None = None, max_clusters: int = 1, ) -> dict[str, Any]: if max_clusters < 1: raise ValueError("--max-clusters must be at least 1") candidates = select_ranked_duplicate_pr_clusters(bundle) if cluster_id: for candidate in candidates: if candidate["cluster_id"] == cluster_id: return candidate known_cluster_ids = { str(row.get("cluster_id")) for row in bundle.report.get("duplicate_prs", []) if row.get("cluster_id") } if cluster_id in known_cluster_ids: raise ValueError( f"Cluster {cluster_id} does not have at least 2 open non-draft pull requests." ) raise ValueError(f"Unknown duplicate PR cluster: {cluster_id}") limited = candidates[:max_clusters] if not limited: raise ValueError("No duplicate PR cluster has at least 2 open non-draft pull requests.") return limited[0] # Prompt/context staging and publish prep def stage_run_bundle( report_path: Path, run_dir: Path, *, selected_cluster: dict[str, Any] | None = None, cluster_id: str | None = None, max_clusters: int = 1, github_client: GitHubClientLike | None = None, prompt_repo: str | None = None, prompt_default_branch: str | None = None, prompt_file_policy_instruction: str | None = None, ) -> dict[str, Any]: bundle = load_snapshot_bundle(report_path) candidates = select_ranked_duplicate_pr_clusters(bundle) if selected_cluster is None: selected_cluster = select_ranked_duplicate_pr_cluster( bundle, cluster_id=cluster_id, max_clusters=max_clusters, ) pr_contexts = _build_pull_request_contexts( bundle, selected_cluster, github_client=github_client ) issue_context = _build_issue_context(bundle, selected_cluster) selected_cluster_path = run_dir / "selected-cluster.json" context_dir = run_dir / "context" pr_context_dir = context_dir / "prs" cluster_context_path = context_dir / "cluster.json" issue_context_path = context_dir / "issue.json" prompt_path = run_dir / "codex-prompt.md" manifest_path = run_dir / "run-manifest.json" result_path = run_dir / "codex-final.json" last_message_path = run_dir / "codex-last-message.json" publish_metadata_path = run_dir / "publish-metadata.json" pr_body_path = run_dir / "pr-body.md" pr_url_path = run_dir / "pr-url.txt" write_json(selected_cluster, selected_cluster_path) pr_context_files: list[dict[str, Any]] = [] for pr_context in pr_contexts: pr_number = int(pr_context["pull_request"]["number"]) path = pr_context_dir / f"pr-{pr_number}.json" write_json(pr_context, path) pr_context_files.append( { "pr_number": pr_number, "path": str(path.resolve()), } ) issue_context_file: str | None = None if issue_context is not None: write_json(issue_context, issue_context_path) issue_context_file = str(issue_context_path.resolve()) cluster_context = { "report_path": str(bundle.report_path), "snapshot_dir": str(bundle.snapshot_dir), "repo": bundle.repo, "snapshot_id": bundle.snapshot_id, "default_branch": prompt_default_branch or _infer_default_branch(bundle, selected_cluster), "selected_cluster": selected_cluster, "target_issue_context_path": issue_context_file, "pull_request_context_files": pr_context_files, } write_json(cluster_context, cluster_context_path) prompt_text = _render_prompt( selected_cluster=selected_cluster, selected_cluster_path=selected_cluster_path.resolve(), cluster_context_path=cluster_context_path.resolve(), pr_context_dir=pr_context_dir.resolve(), issue_context_path=issue_context_file, repo=prompt_repo or bundle.repo, default_branch=prompt_default_branch or _infer_default_branch(bundle, selected_cluster), file_policy_instruction=prompt_file_policy_instruction or _default_file_policy_instruction(), ) write_text(prompt_text, prompt_path) manifest = { "report_path": str(bundle.report_path), "snapshot_dir": str(bundle.snapshot_dir), "repo": bundle.repo, "snapshot_id": bundle.snapshot_id, "max_clusters": max_clusters, "prompt_repo": prompt_repo or bundle.repo, "default_branch": prompt_default_branch or _infer_default_branch(bundle, selected_cluster), "file_policy_instruction": prompt_file_policy_instruction or _default_file_policy_instruction(), "candidate_clusters": candidates[:max_clusters] if max_clusters > 0 else [], "selected_cluster": selected_cluster, "artifacts": { "selected_cluster_path": str(selected_cluster_path.resolve()), "cluster_context_path": str(cluster_context_path.resolve()), "pr_context_dir": str(pr_context_dir.resolve()), "issue_context_path": issue_context_file, "prompt_path": str(prompt_path.resolve()), "schema_path": str(SCHEMA_PATH.resolve()), "result_path": str(result_path.resolve()), "last_message_path": str(last_message_path.resolve()), "publish_metadata_path": str(publish_metadata_path.resolve()), "pr_body_path": str(pr_body_path.resolve()), "pr_url_path": str(pr_url_path.resolve()), }, } write_json(manifest, manifest_path) return manifest def prepare_publish_artifacts(manifest_path: Path, result_path: Path) -> dict[str, Any]: manifest = read_json(manifest_path.resolve()) result = json.loads(result_path.resolve().read_text(encoding="utf-8")) selected_cluster = manifest["selected_cluster"] if result.get("status") != "success": raise ValueError("Codex result did not report status=success.") if result.get("cluster_id") != selected_cluster["cluster_id"]: raise ValueError("Codex result cluster_id does not match the selected cluster.") expected_source_pr_numbers = _ordered_ints(selected_cluster.get("source_pr_numbers")) actual_source_pr_numbers = _ordered_ints(result.get("source_pr_numbers")) if len(actual_source_pr_numbers) < 2: raise ValueError("Codex result must reference at least two open source PRs.") expected_source_pr_set = set(expected_source_pr_numbers) unknown_source_pr_numbers = [ number for number in actual_source_pr_numbers if number not in expected_source_pr_set ] if unknown_source_pr_numbers: raise ValueError( "Codex result source_pr_numbers included PRs outside the selected open PR set: " + ", ".join(str(number) for number in unknown_source_pr_numbers) ) actual_source_pr_numbers = [ number for number in expected_source_pr_numbers if number in set(actual_source_pr_numbers) ] pr_title = str(result.get("pr_title") or "").strip() if not pr_title: raise ValueError("Codex result did not provide a PR title.") summary = str(result.get("summary") or "").strip() if not summary: raise ValueError("Codex result did not provide a summary.") tests_run = [ str(value).strip() for value in result.get("tests_run") or [] if str(value).strip() ] if not tests_run: raise ValueError("Codex result did not provide any executed test commands.") final_body = _render_minimal_pr_body( summary=summary, target_issue_number=_coerce_int(selected_cluster.get("target_issue_number")), source_pr_numbers=actual_source_pr_numbers, tests_run=tests_run, ) pr_body_path = Path(manifest["artifacts"]["pr_body_path"]) publish_metadata_path = Path(manifest["artifacts"]["publish_metadata_path"]) write_text(final_body, pr_body_path) publish_metadata = { "cluster_id": selected_cluster["cluster_id"], "canonical_pr_number": selected_cluster.get("canonical_pr_number"), "source_pr_numbers": actual_source_pr_numbers, "pr_title": pr_title, "pr_body_path": str(pr_body_path.resolve()), "summary": summary, "tests_run": tests_run, } write_json(publish_metadata, publish_metadata_path) return publish_metadata # Snapshot/GitHub context builders def _build_pull_request_contexts( bundle: SnapshotBundle, selected_cluster: dict[str, Any], *, github_client: GitHubClientLike | None = None, ) -> list[dict[str, Any]]: pull_request_map = { int(row["number"]): row for row in bundle.pull_requests if row.get("number") is not None } owner, repo = _split_repo(bundle.repo) contexts: list[dict[str, Any]] = [] client = github_client for pr_number in selected_cluster["source_pr_numbers"]: pull_request = pull_request_map.get(pr_number) if pull_request is None: raise ValueError(f"Missing pull request row for #{pr_number}") files = _matching_rows(bundle.pr_files, "pull_request_number", pr_number) files_source = "snapshot" if not files: client = client or _build_github_client() files = [ normalize_pr_file( bundle.repo, pr_number, item, bundle.snapshot_id, bundle.extracted_at ) for item in client.iter_pull_files(owner, repo, pr_number) ] files_source = "github_api" diff_rows = _matching_rows(bundle.pr_diffs, "pull_request_number", pr_number) diff_source = "snapshot" if diff_rows: diff_row = diff_rows[0] else: client = client or _build_github_client() diff_row = normalize_pr_diff( bundle.repo, pr_number, pull_request.get("html_url"), pull_request.get("api_url"), client.get_pull_request_diff(owner, repo, pr_number), bundle.snapshot_id, bundle.extracted_at, ) diff_source = "github_api" comments = [ row for row in bundle.comments if row.get("parent_kind") == "pull_request" and _coerce_int(row.get("parent_number")) == pr_number ] comments_source = "snapshot" if not comments: client = client or _build_github_client() comments = [ normalize_comment( bundle.repo, item, "pull_request", pr_number, bundle.snapshot_id, bundle.extracted_at, ) for item in client.iter_issue_comments_for_number( owner, repo, pr_number, since=None ) ] comments_source = "github_api" reviews = _matching_rows(bundle.reviews, "pull_request_number", pr_number) reviews_source = "snapshot" if not reviews: client = client or _build_github_client() reviews = [ normalize_review( bundle.repo, pr_number, item, bundle.snapshot_id, bundle.extracted_at ) for item in client.iter_pull_reviews(owner, repo, pr_number) ] reviews_source = "github_api" review_comments = _matching_rows(bundle.review_comments, "pull_request_number", pr_number) review_comments_source = "snapshot" if not review_comments: client = client or _build_github_client() review_comments = [ normalize_review_comment( bundle.repo, pr_number, item, bundle.snapshot_id, bundle.extracted_at, ) for item in client.iter_pull_review_comments(owner, repo, pr_number) ] review_comments_source = "github_api" contexts.append( { "pull_request": pull_request, "files": files, "diff": diff_row, "discussion_comments": comments, "reviews": reviews, "review_comments": review_comments, "context_source": { "files": files_source, "diff": diff_source, "discussion_comments": comments_source, "reviews": reviews_source, "review_comments": review_comments_source, }, } ) return contexts def _build_issue_context( bundle: SnapshotBundle, selected_cluster: dict[str, Any], ) -> dict[str, Any] | None: target_issue_number = selected_cluster.get("target_issue_number") if target_issue_number is None: return None issue_map = {int(row["number"]): row for row in bundle.issues if row.get("number") is not None} issue = issue_map.get(int(target_issue_number)) issue_comments = [ row for row in bundle.comments if row.get("parent_kind") == "issue" and _coerce_int(row.get("parent_number")) == int(target_issue_number) ] return { "issue": issue, "comments": issue_comments, } def _render_prompt( *, selected_cluster: dict[str, Any], selected_cluster_path: Path, cluster_context_path: Path, pr_context_dir: Path, issue_context_path: str | None, repo: str, default_branch: str, file_policy_instruction: str, ) -> str: template = PROMPT_TEMPLATE_PATH.read_text(encoding="utf-8") replacements = { "CLUSTER_ID": selected_cluster["cluster_id"], "SOURCE_PR_NUMBERS": ", ".join( str(number) for number in selected_cluster["source_pr_numbers"] ), "SELECTED_CLUSTER_PATH": str(selected_cluster_path), "CLUSTER_CONTEXT_PATH": str(cluster_context_path), "PR_CONTEXT_DIR": str(pr_context_dir), "ISSUE_CONTEXT_PATH": issue_context_path or "No separate issue context file is available.", "REPO": repo, "DEFAULT_BRANCH": default_branch, "FILE_POLICY_INSTRUCTION": file_policy_instruction, } for key, value in replacements.items(): template = template.replace(f"{{{{{key}}}}}", value) return template def _render_minimal_pr_body( *, summary: str, target_issue_number: int | None, source_pr_numbers: list[int], tests_run: list[str], ) -> str: lines = [summary] if target_issue_number is not None: lines.extend(["", f"Target issue: #{target_issue_number}."]) lines.append("") lines.append("Tests:") lines.extend(f"- `{command}`" for command in tests_run) lines.append("") lines.append("Source PRs:") lines.extend(f"- #{number}" for number in source_pr_numbers) return "\n".join(lines).rstrip() + "\n" def _build_github_client() -> GitHubClient: return GitHubClient(token=resolve_github_token()) def _infer_default_branch(bundle: SnapshotBundle, selected_cluster: dict[str, Any]) -> str: pull_request_map = { int(row["number"]): row for row in bundle.pull_requests if row.get("number") is not None } for pr_number in selected_cluster.get("source_pr_numbers") or []: row = pull_request_map.get(int(pr_number)) base_ref = str((row or {}).get("base_ref") or "").strip() if base_ref: return base_ref return "main" def _default_file_policy_instruction() -> str: return ( "Do not touch README files, changelogs, markdown docs, prose-only files, " "or commentary artifacts. Fail instead of submitting a noisy branch." ) def _matching_rows(rows: list[dict[str, Any]], key: str, value: int) -> list[dict[str, Any]]: matched: list[dict[str, Any]] = [] for row in rows: row_value = _coerce_int(row.get(key)) if row_value == value: matched.append(row) return matched def _is_open_non_draft_pull_request(row: dict[str, Any] | None) -> bool: return bool(row) and row.get("state") == "open" and not bool(row.get("draft")) def _ordered_ints(values: Any) -> list[int]: ordered: list[int] = [] seen: set[int] = set() for value in values or []: number = _coerce_int(value) if number is None or number in seen: continue ordered.append(number) seen.add(number) return ordered def _coerce_int(value: Any) -> int | None: if value is None: return None try: return int(value) except (TypeError, ValueError): return None def _split_repo(repo: str) -> tuple[str, str]: owner, _, name = repo.partition("/") if not owner or not name: raise ValueError(f"Expected repo in owner/name form, got: {repo!r}") return owner, name def _build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(prog="python -m slop_farmer.reports.canonical_duplicate_pr") subparsers = parser.add_subparsers(dest="command", required=True) stage = subparsers.add_parser( "stage-run", help="Select an eligible duplicate PR cluster and stage context files." ) stage.add_argument("--report", type=Path, required=True, help="Path to analysis-report.json.") stage.add_argument("--run-dir", type=Path, required=True, help="Run artifact directory.") stage.add_argument("--cluster-id", help="Optional cluster override.") stage.add_argument( "--max-clusters", type=int, default=1, help="Maximum number of ranked eligible candidates to record.", ) publish = subparsers.add_parser( "prepare-publish", help="Normalize a Codex result into deterministic publish metadata." ) publish.add_argument( "--manifest", type=Path, required=True, help="Path to a staged run-manifest.json." ) publish.add_argument( "--result", type=Path, required=True, help="Path to the Codex JSON result." ) return parser def main() -> None: parser = _build_parser() args = parser.parse_args() if args.command == "stage-run": manifest = stage_run_bundle( args.report, args.run_dir, cluster_id=args.cluster_id, max_clusters=args.max_clusters, ) print(manifest["artifacts"]["prompt_path"]) return if args.command == "prepare-publish": publish_metadata = prepare_publish_artifacts(args.manifest, args.result) print(publish_metadata["pr_body_path"]) return if __name__ == "__main__": main()