diffusers-pr-api / src /slop_farmer /reports /canonical_duplicate_pr.py
evalstate's picture
evalstate HF Staff
Deploy Diffusers PR API
dbf7313 verified
from __future__ import annotations
import argparse
import json
from collections.abc import Iterable
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Protocol
from slop_farmer.config import resolve_github_token
from slop_farmer.data.github_api import GitHubClient
from slop_farmer.data.normalize import (
normalize_comment,
normalize_pr_diff,
normalize_pr_file,
normalize_review,
normalize_review_comment,
)
from slop_farmer.data.parquet_io import read_json, read_parquet_rows, write_json, write_text
PROMPT_TEMPLATE_PATH = Path(__file__).resolve().parent / "prompts" / "canonical_duplicate_pr.md"
SCHEMA_PATH = (
Path(__file__).resolve().parent / "schemas" / "canonical_duplicate_pr_output.schema.json"
)
# Navigation:
# - snapshot loading + ranked cluster selection
# - stage_run_bundle(): prepare prompt/context artifacts for synthesis
# - prepare_publish_artifacts(): turn a successful Codex result into PR-ready output
# - context builders + GitHub fallback helpers
# - tiny CLI at the bottom for manual staging / publish prep
@dataclass(slots=True)
class SnapshotBundle:
report_path: Path
snapshot_dir: Path
repo: str
snapshot_id: str
extracted_at: str
report: dict[str, Any]
issues: list[dict[str, Any]]
pull_requests: list[dict[str, Any]]
comments: list[dict[str, Any]]
reviews: list[dict[str, Any]]
review_comments: list[dict[str, Any]]
pr_files: list[dict[str, Any]]
pr_diffs: list[dict[str, Any]]
class GitHubClientLike(Protocol):
def iter_pull_files(self, owner: str, repo: str, number: int) -> Iterable[dict[str, Any]]: ...
def get_pull_request_diff(self, owner: str, repo: str, number: int) -> str: ...
def iter_issue_comments_for_number(
self, owner: str, repo: str, number: int, since: str | None
) -> Iterable[dict[str, Any]]: ...
def iter_pull_reviews(self, owner: str, repo: str, number: int) -> Iterable[dict[str, Any]]: ...
def iter_pull_review_comments(
self, owner: str, repo: str, number: int
) -> Iterable[dict[str, Any]]: ...
# Snapshot loading and cluster selection
def load_snapshot_bundle(report_path: Path) -> SnapshotBundle:
resolved_report = report_path.resolve()
snapshot_dir = resolved_report.parent
manifest_path = snapshot_dir / "manifest.json"
manifest = read_json(manifest_path) if manifest_path.exists() else {}
report = read_json(resolved_report)
repo = str(report.get("repo") or manifest.get("repo") or "")
snapshot_id = str(report.get("snapshot_id") or manifest.get("snapshot_id") or snapshot_dir.name)
extracted_at = str(manifest.get("extracted_at") or report.get("generated_at") or "")
return SnapshotBundle(
report_path=resolved_report,
snapshot_dir=snapshot_dir,
repo=repo,
snapshot_id=snapshot_id,
extracted_at=extracted_at,
report=report,
issues=read_parquet_rows(snapshot_dir / "issues.parquet"),
pull_requests=read_parquet_rows(snapshot_dir / "pull_requests.parquet"),
comments=read_parquet_rows(snapshot_dir / "comments.parquet"),
reviews=read_parquet_rows(snapshot_dir / "reviews.parquet"),
review_comments=read_parquet_rows(snapshot_dir / "review_comments.parquet"),
pr_files=read_parquet_rows(snapshot_dir / "pr_files.parquet"),
pr_diffs=read_parquet_rows(snapshot_dir / "pr_diffs.parquet"),
)
def select_ranked_duplicate_pr_clusters(
bundle: SnapshotBundle,
*,
limit: int | None = None,
) -> list[dict[str, Any]]:
duplicate_prs = {
str(row.get("cluster_id")): row
for row in bundle.report.get("duplicate_prs", [])
if row.get("cluster_id")
}
pull_request_map = {
int(row["number"]): row for row in bundle.pull_requests if row.get("number") is not None
}
candidates: list[dict[str, Any]] = []
for rank_index, meta_bug in enumerate(bundle.report.get("meta_bugs", []), start=1):
cluster_id = str(meta_bug.get("cluster_id") or "")
duplicate_cluster = duplicate_prs.get(cluster_id)
if not duplicate_cluster:
continue
all_pr_numbers = _ordered_ints(meta_bug.get("pr_numbers"))
open_source_pr_numbers = [
number
for number in all_pr_numbers
if _is_open_non_draft_pull_request(pull_request_map.get(number))
]
if len(open_source_pr_numbers) < 2:
continue
candidate = {
"cluster_id": cluster_id,
"rank_index": rank_index,
"canonical_pr_number": _coerce_int(meta_bug.get("canonical_pr_number")),
"canonical_issue_number": _coerce_int(meta_bug.get("canonical_issue_number")),
"target_issue_number": _coerce_int(duplicate_cluster.get("target_issue_number")),
"all_pr_numbers": all_pr_numbers,
"duplicate_pr_numbers": _ordered_ints(duplicate_cluster.get("duplicate_pr_numbers")),
"source_pr_numbers": open_source_pr_numbers,
"issue_numbers": _ordered_ints(meta_bug.get("issue_numbers")),
"summary": meta_bug.get("summary"),
"status": meta_bug.get("status"),
"confidence": meta_bug.get("confidence"),
"evidence_types": list(meta_bug.get("evidence_types") or []),
"reason": duplicate_cluster.get("reason"),
}
candidates.append(candidate)
if limit is not None and len(candidates) >= limit:
break
return candidates
def select_ranked_duplicate_pr_cluster(
bundle: SnapshotBundle,
*,
cluster_id: str | None = None,
max_clusters: int = 1,
) -> dict[str, Any]:
if max_clusters < 1:
raise ValueError("--max-clusters must be at least 1")
candidates = select_ranked_duplicate_pr_clusters(bundle)
if cluster_id:
for candidate in candidates:
if candidate["cluster_id"] == cluster_id:
return candidate
known_cluster_ids = {
str(row.get("cluster_id"))
for row in bundle.report.get("duplicate_prs", [])
if row.get("cluster_id")
}
if cluster_id in known_cluster_ids:
raise ValueError(
f"Cluster {cluster_id} does not have at least 2 open non-draft pull requests."
)
raise ValueError(f"Unknown duplicate PR cluster: {cluster_id}")
limited = candidates[:max_clusters]
if not limited:
raise ValueError("No duplicate PR cluster has at least 2 open non-draft pull requests.")
return limited[0]
# Prompt/context staging and publish prep
def stage_run_bundle(
report_path: Path,
run_dir: Path,
*,
selected_cluster: dict[str, Any] | None = None,
cluster_id: str | None = None,
max_clusters: int = 1,
github_client: GitHubClientLike | None = None,
prompt_repo: str | None = None,
prompt_default_branch: str | None = None,
prompt_file_policy_instruction: str | None = None,
) -> dict[str, Any]:
bundle = load_snapshot_bundle(report_path)
candidates = select_ranked_duplicate_pr_clusters(bundle)
if selected_cluster is None:
selected_cluster = select_ranked_duplicate_pr_cluster(
bundle,
cluster_id=cluster_id,
max_clusters=max_clusters,
)
pr_contexts = _build_pull_request_contexts(
bundle, selected_cluster, github_client=github_client
)
issue_context = _build_issue_context(bundle, selected_cluster)
selected_cluster_path = run_dir / "selected-cluster.json"
context_dir = run_dir / "context"
pr_context_dir = context_dir / "prs"
cluster_context_path = context_dir / "cluster.json"
issue_context_path = context_dir / "issue.json"
prompt_path = run_dir / "codex-prompt.md"
manifest_path = run_dir / "run-manifest.json"
result_path = run_dir / "codex-final.json"
last_message_path = run_dir / "codex-last-message.json"
publish_metadata_path = run_dir / "publish-metadata.json"
pr_body_path = run_dir / "pr-body.md"
pr_url_path = run_dir / "pr-url.txt"
write_json(selected_cluster, selected_cluster_path)
pr_context_files: list[dict[str, Any]] = []
for pr_context in pr_contexts:
pr_number = int(pr_context["pull_request"]["number"])
path = pr_context_dir / f"pr-{pr_number}.json"
write_json(pr_context, path)
pr_context_files.append(
{
"pr_number": pr_number,
"path": str(path.resolve()),
}
)
issue_context_file: str | None = None
if issue_context is not None:
write_json(issue_context, issue_context_path)
issue_context_file = str(issue_context_path.resolve())
cluster_context = {
"report_path": str(bundle.report_path),
"snapshot_dir": str(bundle.snapshot_dir),
"repo": bundle.repo,
"snapshot_id": bundle.snapshot_id,
"default_branch": prompt_default_branch or _infer_default_branch(bundle, selected_cluster),
"selected_cluster": selected_cluster,
"target_issue_context_path": issue_context_file,
"pull_request_context_files": pr_context_files,
}
write_json(cluster_context, cluster_context_path)
prompt_text = _render_prompt(
selected_cluster=selected_cluster,
selected_cluster_path=selected_cluster_path.resolve(),
cluster_context_path=cluster_context_path.resolve(),
pr_context_dir=pr_context_dir.resolve(),
issue_context_path=issue_context_file,
repo=prompt_repo or bundle.repo,
default_branch=prompt_default_branch or _infer_default_branch(bundle, selected_cluster),
file_policy_instruction=prompt_file_policy_instruction
or _default_file_policy_instruction(),
)
write_text(prompt_text, prompt_path)
manifest = {
"report_path": str(bundle.report_path),
"snapshot_dir": str(bundle.snapshot_dir),
"repo": bundle.repo,
"snapshot_id": bundle.snapshot_id,
"max_clusters": max_clusters,
"prompt_repo": prompt_repo or bundle.repo,
"default_branch": prompt_default_branch or _infer_default_branch(bundle, selected_cluster),
"file_policy_instruction": prompt_file_policy_instruction
or _default_file_policy_instruction(),
"candidate_clusters": candidates[:max_clusters] if max_clusters > 0 else [],
"selected_cluster": selected_cluster,
"artifacts": {
"selected_cluster_path": str(selected_cluster_path.resolve()),
"cluster_context_path": str(cluster_context_path.resolve()),
"pr_context_dir": str(pr_context_dir.resolve()),
"issue_context_path": issue_context_file,
"prompt_path": str(prompt_path.resolve()),
"schema_path": str(SCHEMA_PATH.resolve()),
"result_path": str(result_path.resolve()),
"last_message_path": str(last_message_path.resolve()),
"publish_metadata_path": str(publish_metadata_path.resolve()),
"pr_body_path": str(pr_body_path.resolve()),
"pr_url_path": str(pr_url_path.resolve()),
},
}
write_json(manifest, manifest_path)
return manifest
def prepare_publish_artifacts(manifest_path: Path, result_path: Path) -> dict[str, Any]:
manifest = read_json(manifest_path.resolve())
result = json.loads(result_path.resolve().read_text(encoding="utf-8"))
selected_cluster = manifest["selected_cluster"]
if result.get("status") != "success":
raise ValueError("Codex result did not report status=success.")
if result.get("cluster_id") != selected_cluster["cluster_id"]:
raise ValueError("Codex result cluster_id does not match the selected cluster.")
expected_source_pr_numbers = _ordered_ints(selected_cluster.get("source_pr_numbers"))
actual_source_pr_numbers = _ordered_ints(result.get("source_pr_numbers"))
if len(actual_source_pr_numbers) < 2:
raise ValueError("Codex result must reference at least two open source PRs.")
expected_source_pr_set = set(expected_source_pr_numbers)
unknown_source_pr_numbers = [
number for number in actual_source_pr_numbers if number not in expected_source_pr_set
]
if unknown_source_pr_numbers:
raise ValueError(
"Codex result source_pr_numbers included PRs outside the selected open PR set: "
+ ", ".join(str(number) for number in unknown_source_pr_numbers)
)
actual_source_pr_numbers = [
number for number in expected_source_pr_numbers if number in set(actual_source_pr_numbers)
]
pr_title = str(result.get("pr_title") or "").strip()
if not pr_title:
raise ValueError("Codex result did not provide a PR title.")
summary = str(result.get("summary") or "").strip()
if not summary:
raise ValueError("Codex result did not provide a summary.")
tests_run = [
str(value).strip() for value in result.get("tests_run") or [] if str(value).strip()
]
if not tests_run:
raise ValueError("Codex result did not provide any executed test commands.")
final_body = _render_minimal_pr_body(
summary=summary,
target_issue_number=_coerce_int(selected_cluster.get("target_issue_number")),
source_pr_numbers=actual_source_pr_numbers,
tests_run=tests_run,
)
pr_body_path = Path(manifest["artifacts"]["pr_body_path"])
publish_metadata_path = Path(manifest["artifacts"]["publish_metadata_path"])
write_text(final_body, pr_body_path)
publish_metadata = {
"cluster_id": selected_cluster["cluster_id"],
"canonical_pr_number": selected_cluster.get("canonical_pr_number"),
"source_pr_numbers": actual_source_pr_numbers,
"pr_title": pr_title,
"pr_body_path": str(pr_body_path.resolve()),
"summary": summary,
"tests_run": tests_run,
}
write_json(publish_metadata, publish_metadata_path)
return publish_metadata
# Snapshot/GitHub context builders
def _build_pull_request_contexts(
bundle: SnapshotBundle,
selected_cluster: dict[str, Any],
*,
github_client: GitHubClientLike | None = None,
) -> list[dict[str, Any]]:
pull_request_map = {
int(row["number"]): row for row in bundle.pull_requests if row.get("number") is not None
}
owner, repo = _split_repo(bundle.repo)
contexts: list[dict[str, Any]] = []
client = github_client
for pr_number in selected_cluster["source_pr_numbers"]:
pull_request = pull_request_map.get(pr_number)
if pull_request is None:
raise ValueError(f"Missing pull request row for #{pr_number}")
files = _matching_rows(bundle.pr_files, "pull_request_number", pr_number)
files_source = "snapshot"
if not files:
client = client or _build_github_client()
files = [
normalize_pr_file(
bundle.repo, pr_number, item, bundle.snapshot_id, bundle.extracted_at
)
for item in client.iter_pull_files(owner, repo, pr_number)
]
files_source = "github_api"
diff_rows = _matching_rows(bundle.pr_diffs, "pull_request_number", pr_number)
diff_source = "snapshot"
if diff_rows:
diff_row = diff_rows[0]
else:
client = client or _build_github_client()
diff_row = normalize_pr_diff(
bundle.repo,
pr_number,
pull_request.get("html_url"),
pull_request.get("api_url"),
client.get_pull_request_diff(owner, repo, pr_number),
bundle.snapshot_id,
bundle.extracted_at,
)
diff_source = "github_api"
comments = [
row
for row in bundle.comments
if row.get("parent_kind") == "pull_request"
and _coerce_int(row.get("parent_number")) == pr_number
]
comments_source = "snapshot"
if not comments:
client = client or _build_github_client()
comments = [
normalize_comment(
bundle.repo,
item,
"pull_request",
pr_number,
bundle.snapshot_id,
bundle.extracted_at,
)
for item in client.iter_issue_comments_for_number(
owner, repo, pr_number, since=None
)
]
comments_source = "github_api"
reviews = _matching_rows(bundle.reviews, "pull_request_number", pr_number)
reviews_source = "snapshot"
if not reviews:
client = client or _build_github_client()
reviews = [
normalize_review(
bundle.repo, pr_number, item, bundle.snapshot_id, bundle.extracted_at
)
for item in client.iter_pull_reviews(owner, repo, pr_number)
]
reviews_source = "github_api"
review_comments = _matching_rows(bundle.review_comments, "pull_request_number", pr_number)
review_comments_source = "snapshot"
if not review_comments:
client = client or _build_github_client()
review_comments = [
normalize_review_comment(
bundle.repo,
pr_number,
item,
bundle.snapshot_id,
bundle.extracted_at,
)
for item in client.iter_pull_review_comments(owner, repo, pr_number)
]
review_comments_source = "github_api"
contexts.append(
{
"pull_request": pull_request,
"files": files,
"diff": diff_row,
"discussion_comments": comments,
"reviews": reviews,
"review_comments": review_comments,
"context_source": {
"files": files_source,
"diff": diff_source,
"discussion_comments": comments_source,
"reviews": reviews_source,
"review_comments": review_comments_source,
},
}
)
return contexts
def _build_issue_context(
bundle: SnapshotBundle,
selected_cluster: dict[str, Any],
) -> dict[str, Any] | None:
target_issue_number = selected_cluster.get("target_issue_number")
if target_issue_number is None:
return None
issue_map = {int(row["number"]): row for row in bundle.issues if row.get("number") is not None}
issue = issue_map.get(int(target_issue_number))
issue_comments = [
row
for row in bundle.comments
if row.get("parent_kind") == "issue"
and _coerce_int(row.get("parent_number")) == int(target_issue_number)
]
return {
"issue": issue,
"comments": issue_comments,
}
def _render_prompt(
*,
selected_cluster: dict[str, Any],
selected_cluster_path: Path,
cluster_context_path: Path,
pr_context_dir: Path,
issue_context_path: str | None,
repo: str,
default_branch: str,
file_policy_instruction: str,
) -> str:
template = PROMPT_TEMPLATE_PATH.read_text(encoding="utf-8")
replacements = {
"CLUSTER_ID": selected_cluster["cluster_id"],
"SOURCE_PR_NUMBERS": ", ".join(
str(number) for number in selected_cluster["source_pr_numbers"]
),
"SELECTED_CLUSTER_PATH": str(selected_cluster_path),
"CLUSTER_CONTEXT_PATH": str(cluster_context_path),
"PR_CONTEXT_DIR": str(pr_context_dir),
"ISSUE_CONTEXT_PATH": issue_context_path or "No separate issue context file is available.",
"REPO": repo,
"DEFAULT_BRANCH": default_branch,
"FILE_POLICY_INSTRUCTION": file_policy_instruction,
}
for key, value in replacements.items():
template = template.replace(f"{{{{{key}}}}}", value)
return template
def _render_minimal_pr_body(
*,
summary: str,
target_issue_number: int | None,
source_pr_numbers: list[int],
tests_run: list[str],
) -> str:
lines = [summary]
if target_issue_number is not None:
lines.extend(["", f"Target issue: #{target_issue_number}."])
lines.append("")
lines.append("Tests:")
lines.extend(f"- `{command}`" for command in tests_run)
lines.append("")
lines.append("Source PRs:")
lines.extend(f"- #{number}" for number in source_pr_numbers)
return "\n".join(lines).rstrip() + "\n"
def _build_github_client() -> GitHubClient:
return GitHubClient(token=resolve_github_token())
def _infer_default_branch(bundle: SnapshotBundle, selected_cluster: dict[str, Any]) -> str:
pull_request_map = {
int(row["number"]): row for row in bundle.pull_requests if row.get("number") is not None
}
for pr_number in selected_cluster.get("source_pr_numbers") or []:
row = pull_request_map.get(int(pr_number))
base_ref = str((row or {}).get("base_ref") or "").strip()
if base_ref:
return base_ref
return "main"
def _default_file_policy_instruction() -> str:
return (
"Do not touch README files, changelogs, markdown docs, prose-only files, "
"or commentary artifacts. Fail instead of submitting a noisy branch."
)
def _matching_rows(rows: list[dict[str, Any]], key: str, value: int) -> list[dict[str, Any]]:
matched: list[dict[str, Any]] = []
for row in rows:
row_value = _coerce_int(row.get(key))
if row_value == value:
matched.append(row)
return matched
def _is_open_non_draft_pull_request(row: dict[str, Any] | None) -> bool:
return bool(row) and row.get("state") == "open" and not bool(row.get("draft"))
def _ordered_ints(values: Any) -> list[int]:
ordered: list[int] = []
seen: set[int] = set()
for value in values or []:
number = _coerce_int(value)
if number is None or number in seen:
continue
ordered.append(number)
seen.add(number)
return ordered
def _coerce_int(value: Any) -> int | None:
if value is None:
return None
try:
return int(value)
except (TypeError, ValueError):
return None
def _split_repo(repo: str) -> tuple[str, str]:
owner, _, name = repo.partition("/")
if not owner or not name:
raise ValueError(f"Expected repo in owner/name form, got: {repo!r}")
return owner, name
def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="python -m slop_farmer.reports.canonical_duplicate_pr")
subparsers = parser.add_subparsers(dest="command", required=True)
stage = subparsers.add_parser(
"stage-run", help="Select an eligible duplicate PR cluster and stage context files."
)
stage.add_argument("--report", type=Path, required=True, help="Path to analysis-report.json.")
stage.add_argument("--run-dir", type=Path, required=True, help="Run artifact directory.")
stage.add_argument("--cluster-id", help="Optional cluster override.")
stage.add_argument(
"--max-clusters",
type=int,
default=1,
help="Maximum number of ranked eligible candidates to record.",
)
publish = subparsers.add_parser(
"prepare-publish", help="Normalize a Codex result into deterministic publish metadata."
)
publish.add_argument(
"--manifest", type=Path, required=True, help="Path to a staged run-manifest.json."
)
publish.add_argument(
"--result", type=Path, required=True, help="Path to the Codex JSON result."
)
return parser
def main() -> None:
parser = _build_parser()
args = parser.parse_args()
if args.command == "stage-run":
manifest = stage_run_bundle(
args.report,
args.run_dir,
cluster_id=args.cluster_id,
max_clusters=args.max_clusters,
)
print(manifest["artifacts"]["prompt_path"])
return
if args.command == "prepare-publish":
publish_metadata = prepare_publish_artifacts(args.manifest, args.result)
print(publish_metadata["pr_body_path"])
return
if __name__ == "__main__":
main()