#!/usr/bin/env python3 from __future__ import annotations import json from datetime import datetime, timezone from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] BUILD_DIR = ROOT / "build" / "system" USER_GOVERNANCE_PATH = BUILD_DIR / "user_governance.json" OUTPUT_PATH = BUILD_DIR / "product_candidate_registry.json" def utc_now() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def load_json(path: Path) -> dict[str, Any]: return json.loads(path.read_text(encoding="utf-8")) def delivery_stage(status: str) -> str: return { "implemented": "live_candidate", "partial": "prototype", "requested": "discovery", }.get(status, "unknown") def default_lane(primitive: str) -> str: if primitive in {"auto_reuse_prior_tools", "prior_artifact_recall", "history_navigation_surface"}: return "memory" if primitive in {"thin_intent_shell", "route_by_purpose"}: return "tooling" if primitive in {"intent_to_packet"}: return "audit" return "memory" def build_candidate(move: dict[str, Any]) -> dict[str, Any]: primitive = move["primitive"] return { "candidate_id": primitive, "title": primitive.replace("_", " "), "primitive": primitive, "lane": default_lane(primitive), "priority_score": move["score"], "status": move["status"], "delivery_stage": delivery_stage(move["status"]), "trigger": move.get("trigger", ""), "why": move.get("why", ""), "first_surface": move.get("first_surface", ""), "expected_artifact": move.get("expected_artifact", ""), "benchmark_gate": "benchmarks/run_standalone_control_bench.sh", "promotion_gate": "receipts + benchmark pass", } def main() -> int: if not USER_GOVERNANCE_PATH.exists(): raise SystemExit(f"user governance surface missing: {USER_GOVERNANCE_PATH}") governance = load_json(USER_GOVERNANCE_PATH) candidates = [build_candidate(move) for move in governance.get("next_moves", [])] candidates.sort(key=lambda item: (-item["priority_score"], item["candidate_id"])) registry = { "compiled_at": utc_now(), "source": str(USER_GOVERNANCE_PATH), "one_liner": "Product candidates are reduced delivery slices discovered from user data and ranked for bounded shipping work.", "rules": governance.get("governing_rules", []), "motif_rule": governance.get("motif_rule"), "candidate_count": len(candidates), "top_candidate_id": candidates[0]["candidate_id"] if candidates else None, "candidates": candidates, } BUILD_DIR.mkdir(parents=True, exist_ok=True) OUTPUT_PATH.write_text(json.dumps(registry, indent=2, sort_keys=True) + "\n", encoding="utf-8") print(OUTPUT_PATH) return 0 if __name__ == "__main__": raise SystemExit(main())