Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import argparse | |
| import json | |
| from collections.abc import Iterable | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Any, Protocol | |
| from slop_farmer.config import resolve_github_token | |
| from slop_farmer.data.github_api import GitHubClient | |
| from slop_farmer.data.normalize import ( | |
| normalize_comment, | |
| normalize_pr_diff, | |
| normalize_pr_file, | |
| normalize_review, | |
| normalize_review_comment, | |
| ) | |
| from slop_farmer.data.parquet_io import read_json, read_parquet_rows, write_json, write_text | |
| PROMPT_TEMPLATE_PATH = Path(__file__).resolve().parent / "prompts" / "canonical_duplicate_pr.md" | |
| SCHEMA_PATH = ( | |
| Path(__file__).resolve().parent / "schemas" / "canonical_duplicate_pr_output.schema.json" | |
| ) | |
| # Navigation: | |
| # - snapshot loading + ranked cluster selection | |
| # - stage_run_bundle(): prepare prompt/context artifacts for synthesis | |
| # - prepare_publish_artifacts(): turn a successful Codex result into PR-ready output | |
| # - context builders + GitHub fallback helpers | |
| # - tiny CLI at the bottom for manual staging / publish prep | |
| class SnapshotBundle: | |
| report_path: Path | |
| snapshot_dir: Path | |
| repo: str | |
| snapshot_id: str | |
| extracted_at: str | |
| report: dict[str, Any] | |
| issues: list[dict[str, Any]] | |
| pull_requests: list[dict[str, Any]] | |
| comments: list[dict[str, Any]] | |
| reviews: list[dict[str, Any]] | |
| review_comments: list[dict[str, Any]] | |
| pr_files: list[dict[str, Any]] | |
| pr_diffs: list[dict[str, Any]] | |
| class GitHubClientLike(Protocol): | |
| def iter_pull_files(self, owner: str, repo: str, number: int) -> Iterable[dict[str, Any]]: ... | |
| def get_pull_request_diff(self, owner: str, repo: str, number: int) -> str: ... | |
| def iter_issue_comments_for_number( | |
| self, owner: str, repo: str, number: int, since: str | None | |
| ) -> Iterable[dict[str, Any]]: ... | |
| def iter_pull_reviews(self, owner: str, repo: str, number: int) -> Iterable[dict[str, Any]]: ... | |
| def iter_pull_review_comments( | |
| self, owner: str, repo: str, number: int | |
| ) -> Iterable[dict[str, Any]]: ... | |
| # Snapshot loading and cluster selection | |
| def load_snapshot_bundle(report_path: Path) -> SnapshotBundle: | |
| resolved_report = report_path.resolve() | |
| snapshot_dir = resolved_report.parent | |
| manifest_path = snapshot_dir / "manifest.json" | |
| manifest = read_json(manifest_path) if manifest_path.exists() else {} | |
| report = read_json(resolved_report) | |
| repo = str(report.get("repo") or manifest.get("repo") or "") | |
| snapshot_id = str(report.get("snapshot_id") or manifest.get("snapshot_id") or snapshot_dir.name) | |
| extracted_at = str(manifest.get("extracted_at") or report.get("generated_at") or "") | |
| return SnapshotBundle( | |
| report_path=resolved_report, | |
| snapshot_dir=snapshot_dir, | |
| repo=repo, | |
| snapshot_id=snapshot_id, | |
| extracted_at=extracted_at, | |
| report=report, | |
| issues=read_parquet_rows(snapshot_dir / "issues.parquet"), | |
| pull_requests=read_parquet_rows(snapshot_dir / "pull_requests.parquet"), | |
| comments=read_parquet_rows(snapshot_dir / "comments.parquet"), | |
| reviews=read_parquet_rows(snapshot_dir / "reviews.parquet"), | |
| review_comments=read_parquet_rows(snapshot_dir / "review_comments.parquet"), | |
| pr_files=read_parquet_rows(snapshot_dir / "pr_files.parquet"), | |
| pr_diffs=read_parquet_rows(snapshot_dir / "pr_diffs.parquet"), | |
| ) | |
| def select_ranked_duplicate_pr_clusters( | |
| bundle: SnapshotBundle, | |
| *, | |
| limit: int | None = None, | |
| ) -> list[dict[str, Any]]: | |
| duplicate_prs = { | |
| str(row.get("cluster_id")): row | |
| for row in bundle.report.get("duplicate_prs", []) | |
| if row.get("cluster_id") | |
| } | |
| pull_request_map = { | |
| int(row["number"]): row for row in bundle.pull_requests if row.get("number") is not None | |
| } | |
| candidates: list[dict[str, Any]] = [] | |
| for rank_index, meta_bug in enumerate(bundle.report.get("meta_bugs", []), start=1): | |
| cluster_id = str(meta_bug.get("cluster_id") or "") | |
| duplicate_cluster = duplicate_prs.get(cluster_id) | |
| if not duplicate_cluster: | |
| continue | |
| all_pr_numbers = _ordered_ints(meta_bug.get("pr_numbers")) | |
| open_source_pr_numbers = [ | |
| number | |
| for number in all_pr_numbers | |
| if _is_open_non_draft_pull_request(pull_request_map.get(number)) | |
| ] | |
| if len(open_source_pr_numbers) < 2: | |
| continue | |
| candidate = { | |
| "cluster_id": cluster_id, | |
| "rank_index": rank_index, | |
| "canonical_pr_number": _coerce_int(meta_bug.get("canonical_pr_number")), | |
| "canonical_issue_number": _coerce_int(meta_bug.get("canonical_issue_number")), | |
| "target_issue_number": _coerce_int(duplicate_cluster.get("target_issue_number")), | |
| "all_pr_numbers": all_pr_numbers, | |
| "duplicate_pr_numbers": _ordered_ints(duplicate_cluster.get("duplicate_pr_numbers")), | |
| "source_pr_numbers": open_source_pr_numbers, | |
| "issue_numbers": _ordered_ints(meta_bug.get("issue_numbers")), | |
| "summary": meta_bug.get("summary"), | |
| "status": meta_bug.get("status"), | |
| "confidence": meta_bug.get("confidence"), | |
| "evidence_types": list(meta_bug.get("evidence_types") or []), | |
| "reason": duplicate_cluster.get("reason"), | |
| } | |
| candidates.append(candidate) | |
| if limit is not None and len(candidates) >= limit: | |
| break | |
| return candidates | |
| def select_ranked_duplicate_pr_cluster( | |
| bundle: SnapshotBundle, | |
| *, | |
| cluster_id: str | None = None, | |
| max_clusters: int = 1, | |
| ) -> dict[str, Any]: | |
| if max_clusters < 1: | |
| raise ValueError("--max-clusters must be at least 1") | |
| candidates = select_ranked_duplicate_pr_clusters(bundle) | |
| if cluster_id: | |
| for candidate in candidates: | |
| if candidate["cluster_id"] == cluster_id: | |
| return candidate | |
| known_cluster_ids = { | |
| str(row.get("cluster_id")) | |
| for row in bundle.report.get("duplicate_prs", []) | |
| if row.get("cluster_id") | |
| } | |
| if cluster_id in known_cluster_ids: | |
| raise ValueError( | |
| f"Cluster {cluster_id} does not have at least 2 open non-draft pull requests." | |
| ) | |
| raise ValueError(f"Unknown duplicate PR cluster: {cluster_id}") | |
| limited = candidates[:max_clusters] | |
| if not limited: | |
| raise ValueError("No duplicate PR cluster has at least 2 open non-draft pull requests.") | |
| return limited[0] | |
| # Prompt/context staging and publish prep | |
| def stage_run_bundle( | |
| report_path: Path, | |
| run_dir: Path, | |
| *, | |
| selected_cluster: dict[str, Any] | None = None, | |
| cluster_id: str | None = None, | |
| max_clusters: int = 1, | |
| github_client: GitHubClientLike | None = None, | |
| prompt_repo: str | None = None, | |
| prompt_default_branch: str | None = None, | |
| prompt_file_policy_instruction: str | None = None, | |
| ) -> dict[str, Any]: | |
| bundle = load_snapshot_bundle(report_path) | |
| candidates = select_ranked_duplicate_pr_clusters(bundle) | |
| if selected_cluster is None: | |
| selected_cluster = select_ranked_duplicate_pr_cluster( | |
| bundle, | |
| cluster_id=cluster_id, | |
| max_clusters=max_clusters, | |
| ) | |
| pr_contexts = _build_pull_request_contexts( | |
| bundle, selected_cluster, github_client=github_client | |
| ) | |
| issue_context = _build_issue_context(bundle, selected_cluster) | |
| selected_cluster_path = run_dir / "selected-cluster.json" | |
| context_dir = run_dir / "context" | |
| pr_context_dir = context_dir / "prs" | |
| cluster_context_path = context_dir / "cluster.json" | |
| issue_context_path = context_dir / "issue.json" | |
| prompt_path = run_dir / "codex-prompt.md" | |
| manifest_path = run_dir / "run-manifest.json" | |
| result_path = run_dir / "codex-final.json" | |
| last_message_path = run_dir / "codex-last-message.json" | |
| publish_metadata_path = run_dir / "publish-metadata.json" | |
| pr_body_path = run_dir / "pr-body.md" | |
| pr_url_path = run_dir / "pr-url.txt" | |
| write_json(selected_cluster, selected_cluster_path) | |
| pr_context_files: list[dict[str, Any]] = [] | |
| for pr_context in pr_contexts: | |
| pr_number = int(pr_context["pull_request"]["number"]) | |
| path = pr_context_dir / f"pr-{pr_number}.json" | |
| write_json(pr_context, path) | |
| pr_context_files.append( | |
| { | |
| "pr_number": pr_number, | |
| "path": str(path.resolve()), | |
| } | |
| ) | |
| issue_context_file: str | None = None | |
| if issue_context is not None: | |
| write_json(issue_context, issue_context_path) | |
| issue_context_file = str(issue_context_path.resolve()) | |
| cluster_context = { | |
| "report_path": str(bundle.report_path), | |
| "snapshot_dir": str(bundle.snapshot_dir), | |
| "repo": bundle.repo, | |
| "snapshot_id": bundle.snapshot_id, | |
| "default_branch": prompt_default_branch or _infer_default_branch(bundle, selected_cluster), | |
| "selected_cluster": selected_cluster, | |
| "target_issue_context_path": issue_context_file, | |
| "pull_request_context_files": pr_context_files, | |
| } | |
| write_json(cluster_context, cluster_context_path) | |
| prompt_text = _render_prompt( | |
| selected_cluster=selected_cluster, | |
| selected_cluster_path=selected_cluster_path.resolve(), | |
| cluster_context_path=cluster_context_path.resolve(), | |
| pr_context_dir=pr_context_dir.resolve(), | |
| issue_context_path=issue_context_file, | |
| repo=prompt_repo or bundle.repo, | |
| default_branch=prompt_default_branch or _infer_default_branch(bundle, selected_cluster), | |
| file_policy_instruction=prompt_file_policy_instruction | |
| or _default_file_policy_instruction(), | |
| ) | |
| write_text(prompt_text, prompt_path) | |
| manifest = { | |
| "report_path": str(bundle.report_path), | |
| "snapshot_dir": str(bundle.snapshot_dir), | |
| "repo": bundle.repo, | |
| "snapshot_id": bundle.snapshot_id, | |
| "max_clusters": max_clusters, | |
| "prompt_repo": prompt_repo or bundle.repo, | |
| "default_branch": prompt_default_branch or _infer_default_branch(bundle, selected_cluster), | |
| "file_policy_instruction": prompt_file_policy_instruction | |
| or _default_file_policy_instruction(), | |
| "candidate_clusters": candidates[:max_clusters] if max_clusters > 0 else [], | |
| "selected_cluster": selected_cluster, | |
| "artifacts": { | |
| "selected_cluster_path": str(selected_cluster_path.resolve()), | |
| "cluster_context_path": str(cluster_context_path.resolve()), | |
| "pr_context_dir": str(pr_context_dir.resolve()), | |
| "issue_context_path": issue_context_file, | |
| "prompt_path": str(prompt_path.resolve()), | |
| "schema_path": str(SCHEMA_PATH.resolve()), | |
| "result_path": str(result_path.resolve()), | |
| "last_message_path": str(last_message_path.resolve()), | |
| "publish_metadata_path": str(publish_metadata_path.resolve()), | |
| "pr_body_path": str(pr_body_path.resolve()), | |
| "pr_url_path": str(pr_url_path.resolve()), | |
| }, | |
| } | |
| write_json(manifest, manifest_path) | |
| return manifest | |
| def prepare_publish_artifacts(manifest_path: Path, result_path: Path) -> dict[str, Any]: | |
| manifest = read_json(manifest_path.resolve()) | |
| result = json.loads(result_path.resolve().read_text(encoding="utf-8")) | |
| selected_cluster = manifest["selected_cluster"] | |
| if result.get("status") != "success": | |
| raise ValueError("Codex result did not report status=success.") | |
| if result.get("cluster_id") != selected_cluster["cluster_id"]: | |
| raise ValueError("Codex result cluster_id does not match the selected cluster.") | |
| expected_source_pr_numbers = _ordered_ints(selected_cluster.get("source_pr_numbers")) | |
| actual_source_pr_numbers = _ordered_ints(result.get("source_pr_numbers")) | |
| if len(actual_source_pr_numbers) < 2: | |
| raise ValueError("Codex result must reference at least two open source PRs.") | |
| expected_source_pr_set = set(expected_source_pr_numbers) | |
| unknown_source_pr_numbers = [ | |
| number for number in actual_source_pr_numbers if number not in expected_source_pr_set | |
| ] | |
| if unknown_source_pr_numbers: | |
| raise ValueError( | |
| "Codex result source_pr_numbers included PRs outside the selected open PR set: " | |
| + ", ".join(str(number) for number in unknown_source_pr_numbers) | |
| ) | |
| actual_source_pr_numbers = [ | |
| number for number in expected_source_pr_numbers if number in set(actual_source_pr_numbers) | |
| ] | |
| pr_title = str(result.get("pr_title") or "").strip() | |
| if not pr_title: | |
| raise ValueError("Codex result did not provide a PR title.") | |
| summary = str(result.get("summary") or "").strip() | |
| if not summary: | |
| raise ValueError("Codex result did not provide a summary.") | |
| tests_run = [ | |
| str(value).strip() for value in result.get("tests_run") or [] if str(value).strip() | |
| ] | |
| if not tests_run: | |
| raise ValueError("Codex result did not provide any executed test commands.") | |
| final_body = _render_minimal_pr_body( | |
| summary=summary, | |
| target_issue_number=_coerce_int(selected_cluster.get("target_issue_number")), | |
| source_pr_numbers=actual_source_pr_numbers, | |
| tests_run=tests_run, | |
| ) | |
| pr_body_path = Path(manifest["artifacts"]["pr_body_path"]) | |
| publish_metadata_path = Path(manifest["artifacts"]["publish_metadata_path"]) | |
| write_text(final_body, pr_body_path) | |
| publish_metadata = { | |
| "cluster_id": selected_cluster["cluster_id"], | |
| "canonical_pr_number": selected_cluster.get("canonical_pr_number"), | |
| "source_pr_numbers": actual_source_pr_numbers, | |
| "pr_title": pr_title, | |
| "pr_body_path": str(pr_body_path.resolve()), | |
| "summary": summary, | |
| "tests_run": tests_run, | |
| } | |
| write_json(publish_metadata, publish_metadata_path) | |
| return publish_metadata | |
| # Snapshot/GitHub context builders | |
| def _build_pull_request_contexts( | |
| bundle: SnapshotBundle, | |
| selected_cluster: dict[str, Any], | |
| *, | |
| github_client: GitHubClientLike | None = None, | |
| ) -> list[dict[str, Any]]: | |
| pull_request_map = { | |
| int(row["number"]): row for row in bundle.pull_requests if row.get("number") is not None | |
| } | |
| owner, repo = _split_repo(bundle.repo) | |
| contexts: list[dict[str, Any]] = [] | |
| client = github_client | |
| for pr_number in selected_cluster["source_pr_numbers"]: | |
| pull_request = pull_request_map.get(pr_number) | |
| if pull_request is None: | |
| raise ValueError(f"Missing pull request row for #{pr_number}") | |
| files = _matching_rows(bundle.pr_files, "pull_request_number", pr_number) | |
| files_source = "snapshot" | |
| if not files: | |
| client = client or _build_github_client() | |
| files = [ | |
| normalize_pr_file( | |
| bundle.repo, pr_number, item, bundle.snapshot_id, bundle.extracted_at | |
| ) | |
| for item in client.iter_pull_files(owner, repo, pr_number) | |
| ] | |
| files_source = "github_api" | |
| diff_rows = _matching_rows(bundle.pr_diffs, "pull_request_number", pr_number) | |
| diff_source = "snapshot" | |
| if diff_rows: | |
| diff_row = diff_rows[0] | |
| else: | |
| client = client or _build_github_client() | |
| diff_row = normalize_pr_diff( | |
| bundle.repo, | |
| pr_number, | |
| pull_request.get("html_url"), | |
| pull_request.get("api_url"), | |
| client.get_pull_request_diff(owner, repo, pr_number), | |
| bundle.snapshot_id, | |
| bundle.extracted_at, | |
| ) | |
| diff_source = "github_api" | |
| comments = [ | |
| row | |
| for row in bundle.comments | |
| if row.get("parent_kind") == "pull_request" | |
| and _coerce_int(row.get("parent_number")) == pr_number | |
| ] | |
| comments_source = "snapshot" | |
| if not comments: | |
| client = client or _build_github_client() | |
| comments = [ | |
| normalize_comment( | |
| bundle.repo, | |
| item, | |
| "pull_request", | |
| pr_number, | |
| bundle.snapshot_id, | |
| bundle.extracted_at, | |
| ) | |
| for item in client.iter_issue_comments_for_number( | |
| owner, repo, pr_number, since=None | |
| ) | |
| ] | |
| comments_source = "github_api" | |
| reviews = _matching_rows(bundle.reviews, "pull_request_number", pr_number) | |
| reviews_source = "snapshot" | |
| if not reviews: | |
| client = client or _build_github_client() | |
| reviews = [ | |
| normalize_review( | |
| bundle.repo, pr_number, item, bundle.snapshot_id, bundle.extracted_at | |
| ) | |
| for item in client.iter_pull_reviews(owner, repo, pr_number) | |
| ] | |
| reviews_source = "github_api" | |
| review_comments = _matching_rows(bundle.review_comments, "pull_request_number", pr_number) | |
| review_comments_source = "snapshot" | |
| if not review_comments: | |
| client = client or _build_github_client() | |
| review_comments = [ | |
| normalize_review_comment( | |
| bundle.repo, | |
| pr_number, | |
| item, | |
| bundle.snapshot_id, | |
| bundle.extracted_at, | |
| ) | |
| for item in client.iter_pull_review_comments(owner, repo, pr_number) | |
| ] | |
| review_comments_source = "github_api" | |
| contexts.append( | |
| { | |
| "pull_request": pull_request, | |
| "files": files, | |
| "diff": diff_row, | |
| "discussion_comments": comments, | |
| "reviews": reviews, | |
| "review_comments": review_comments, | |
| "context_source": { | |
| "files": files_source, | |
| "diff": diff_source, | |
| "discussion_comments": comments_source, | |
| "reviews": reviews_source, | |
| "review_comments": review_comments_source, | |
| }, | |
| } | |
| ) | |
| return contexts | |
| def _build_issue_context( | |
| bundle: SnapshotBundle, | |
| selected_cluster: dict[str, Any], | |
| ) -> dict[str, Any] | None: | |
| target_issue_number = selected_cluster.get("target_issue_number") | |
| if target_issue_number is None: | |
| return None | |
| issue_map = {int(row["number"]): row for row in bundle.issues if row.get("number") is not None} | |
| issue = issue_map.get(int(target_issue_number)) | |
| issue_comments = [ | |
| row | |
| for row in bundle.comments | |
| if row.get("parent_kind") == "issue" | |
| and _coerce_int(row.get("parent_number")) == int(target_issue_number) | |
| ] | |
| return { | |
| "issue": issue, | |
| "comments": issue_comments, | |
| } | |
| def _render_prompt( | |
| *, | |
| selected_cluster: dict[str, Any], | |
| selected_cluster_path: Path, | |
| cluster_context_path: Path, | |
| pr_context_dir: Path, | |
| issue_context_path: str | None, | |
| repo: str, | |
| default_branch: str, | |
| file_policy_instruction: str, | |
| ) -> str: | |
| template = PROMPT_TEMPLATE_PATH.read_text(encoding="utf-8") | |
| replacements = { | |
| "CLUSTER_ID": selected_cluster["cluster_id"], | |
| "SOURCE_PR_NUMBERS": ", ".join( | |
| str(number) for number in selected_cluster["source_pr_numbers"] | |
| ), | |
| "SELECTED_CLUSTER_PATH": str(selected_cluster_path), | |
| "CLUSTER_CONTEXT_PATH": str(cluster_context_path), | |
| "PR_CONTEXT_DIR": str(pr_context_dir), | |
| "ISSUE_CONTEXT_PATH": issue_context_path or "No separate issue context file is available.", | |
| "REPO": repo, | |
| "DEFAULT_BRANCH": default_branch, | |
| "FILE_POLICY_INSTRUCTION": file_policy_instruction, | |
| } | |
| for key, value in replacements.items(): | |
| template = template.replace(f"{{{{{key}}}}}", value) | |
| return template | |
| def _render_minimal_pr_body( | |
| *, | |
| summary: str, | |
| target_issue_number: int | None, | |
| source_pr_numbers: list[int], | |
| tests_run: list[str], | |
| ) -> str: | |
| lines = [summary] | |
| if target_issue_number is not None: | |
| lines.extend(["", f"Target issue: #{target_issue_number}."]) | |
| lines.append("") | |
| lines.append("Tests:") | |
| lines.extend(f"- `{command}`" for command in tests_run) | |
| lines.append("") | |
| lines.append("Source PRs:") | |
| lines.extend(f"- #{number}" for number in source_pr_numbers) | |
| return "\n".join(lines).rstrip() + "\n" | |
| def _build_github_client() -> GitHubClient: | |
| return GitHubClient(token=resolve_github_token()) | |
| def _infer_default_branch(bundle: SnapshotBundle, selected_cluster: dict[str, Any]) -> str: | |
| pull_request_map = { | |
| int(row["number"]): row for row in bundle.pull_requests if row.get("number") is not None | |
| } | |
| for pr_number in selected_cluster.get("source_pr_numbers") or []: | |
| row = pull_request_map.get(int(pr_number)) | |
| base_ref = str((row or {}).get("base_ref") or "").strip() | |
| if base_ref: | |
| return base_ref | |
| return "main" | |
| def _default_file_policy_instruction() -> str: | |
| return ( | |
| "Do not touch README files, changelogs, markdown docs, prose-only files, " | |
| "or commentary artifacts. Fail instead of submitting a noisy branch." | |
| ) | |
| def _matching_rows(rows: list[dict[str, Any]], key: str, value: int) -> list[dict[str, Any]]: | |
| matched: list[dict[str, Any]] = [] | |
| for row in rows: | |
| row_value = _coerce_int(row.get(key)) | |
| if row_value == value: | |
| matched.append(row) | |
| return matched | |
| def _is_open_non_draft_pull_request(row: dict[str, Any] | None) -> bool: | |
| return bool(row) and row.get("state") == "open" and not bool(row.get("draft")) | |
| def _ordered_ints(values: Any) -> list[int]: | |
| ordered: list[int] = [] | |
| seen: set[int] = set() | |
| for value in values or []: | |
| number = _coerce_int(value) | |
| if number is None or number in seen: | |
| continue | |
| ordered.append(number) | |
| seen.add(number) | |
| return ordered | |
| def _coerce_int(value: Any) -> int | None: | |
| if value is None: | |
| return None | |
| try: | |
| return int(value) | |
| except (TypeError, ValueError): | |
| return None | |
| def _split_repo(repo: str) -> tuple[str, str]: | |
| owner, _, name = repo.partition("/") | |
| if not owner or not name: | |
| raise ValueError(f"Expected repo in owner/name form, got: {repo!r}") | |
| return owner, name | |
| def _build_parser() -> argparse.ArgumentParser: | |
| parser = argparse.ArgumentParser(prog="python -m slop_farmer.reports.canonical_duplicate_pr") | |
| subparsers = parser.add_subparsers(dest="command", required=True) | |
| stage = subparsers.add_parser( | |
| "stage-run", help="Select an eligible duplicate PR cluster and stage context files." | |
| ) | |
| stage.add_argument("--report", type=Path, required=True, help="Path to analysis-report.json.") | |
| stage.add_argument("--run-dir", type=Path, required=True, help="Run artifact directory.") | |
| stage.add_argument("--cluster-id", help="Optional cluster override.") | |
| stage.add_argument( | |
| "--max-clusters", | |
| type=int, | |
| default=1, | |
| help="Maximum number of ranked eligible candidates to record.", | |
| ) | |
| publish = subparsers.add_parser( | |
| "prepare-publish", help="Normalize a Codex result into deterministic publish metadata." | |
| ) | |
| publish.add_argument( | |
| "--manifest", type=Path, required=True, help="Path to a staged run-manifest.json." | |
| ) | |
| publish.add_argument( | |
| "--result", type=Path, required=True, help="Path to the Codex JSON result." | |
| ) | |
| return parser | |
| def main() -> None: | |
| parser = _build_parser() | |
| args = parser.parse_args() | |
| if args.command == "stage-run": | |
| manifest = stage_run_bundle( | |
| args.report, | |
| args.run_dir, | |
| cluster_id=args.cluster_id, | |
| max_clusters=args.max_clusters, | |
| ) | |
| print(manifest["artifacts"]["prompt_path"]) | |
| return | |
| if args.command == "prepare-publish": | |
| publish_metadata = prepare_publish_artifacts(args.manifest, args.result) | |
| print(publish_metadata["pr_body_path"]) | |
| return | |
| if __name__ == "__main__": | |
| main() | |