from __future__ import annotations import argparse import json from pathlib import Path from typing import Any PROJECT_ROOT = Path(__file__).resolve().parents[1] DEFAULT_RESULTS_DIR = PROJECT_ROOT / "eval" / "results" DEFAULT_OUTPUT_PATH = PROJECT_ROOT / "docs" / "eval_comparison.md" METRIC_NAMES = [ "json_validity_rate", "workflow_accuracy", "status_accuracy", "required_field_presence_accuracy", "unsafe_rejection_accuracy", "false_route_rate", ] def load_eval_results(results_dir: Path) -> list[dict[str, Any]]: if not results_dir.exists(): return [] results: list[dict[str, Any]] = [] for path in sorted(results_dir.glob("*.json")): with path.open("r", encoding="utf-8") as handle: payload = json.load(handle) metrics = payload.get("summary_metrics", {}) if not isinstance(metrics, dict): continue results.append( { "name": _display_name(path, payload), "path": path, "metrics": {metric: metrics.get(metric) for metric in METRIC_NAMES}, } ) return results def build_markdown_table(results: list[dict[str, Any]]) -> str: header = "| Model | " + " | ".join(f"`{metric}`" for metric in METRIC_NAMES) + " |" separator = "| --- | " + " | ".join("---:" for _ in METRIC_NAMES) + " |" rows = [header, separator] for result in results: values = [_format_metric(result["metrics"].get(metric)) for metric in METRIC_NAMES] rows.append(f"| {result['name']} | " + " | ".join(values) + " |") return "\n".join(rows) def build_interpretation(results: list[dict[str, Any]]) -> str: if not results: return ( "## Interpretation\n\n" "No eval result JSON files were found. Run one of the evaluation commands first, " "then regenerate this comparison report.\n" ) best_extraction = _best_higher(results, "required_field_presence_accuracy") safest = _best_safety(results) false_route = _false_route_summary(results) next_step = _next_improvement(results) return ( "## Interpretation\n\n" f"- Best structured extraction: {best_extraction}.\n" f"- Safest model: {safest}.\n" f"- False route rate: {false_route}.\n" f"- Improve next: {next_step}.\n" ) def build_report(results: list[dict[str, Any]]) -> str: table = build_markdown_table(results) if results else "_No eval result files found._" return ( "# Evaluation Comparison\n\n" "This report compares RouterCore eval result artifacts from `eval/results/`.\n\n" "## Metrics\n\n" f"{table}\n\n" f"{build_interpretation(results)}" ) def write_report( results_dir: Path = DEFAULT_RESULTS_DIR, output_path: Path = DEFAULT_OUTPUT_PATH, ) -> tuple[Path, list[dict[str, Any]], str]: results = load_eval_results(results_dir) report = build_report(results) output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(report, encoding="utf-8") return output_path, results, report def _display_name(path: Path, payload: dict[str, Any]) -> str: if "model" in payload: return str(payload["model"]) if "adapter" in payload: return f"LoRA: {Path(str(payload['adapter'])).name}" if path.stem == "fakerouter_eval": return "FakeRouter" return path.stem def _format_metric(value: Any) -> str: if isinstance(value, (int, float)): return f"{value:.2%}" return "n/a" def _best_higher(results: list[dict[str, Any]], metric: str) -> str: scored = [ result for result in results if isinstance(result["metrics"].get(metric), (int, float)) ] if not scored: return "not available" best = max(scored, key=lambda result: result["metrics"][metric]) return f"{best['name']} ({_format_metric(best['metrics'][metric])})" def _best_lower(results: list[dict[str, Any]], metric: str) -> str: scored = [ result for result in results if isinstance(result["metrics"].get(metric), (int, float)) ] if not scored: return "not available" best = min(scored, key=lambda result: result["metrics"][metric]) best_value = best["metrics"][metric] tied = [result for result in scored if result["metrics"][metric] == best_value] names = ", ".join(result["name"] for result in tied) return f"{names} ({_format_metric(best_value)})" def _best_safety(results: list[dict[str, Any]]) -> str: scored = [ result for result in results if isinstance(result["metrics"].get("unsafe_rejection_accuracy"), (int, float)) and isinstance(result["metrics"].get("false_route_rate"), (int, float)) ] if not scored: return "not available" best = max( scored, key=lambda result: ( result["metrics"]["unsafe_rejection_accuracy"], -result["metrics"]["false_route_rate"], ), ) best_unsafe = best["metrics"]["unsafe_rejection_accuracy"] best_false_route = best["metrics"]["false_route_rate"] tied = [ result for result in scored if result["metrics"]["unsafe_rejection_accuracy"] == best_unsafe and result["metrics"]["false_route_rate"] == best_false_route ] names = ", ".join(result["name"] for result in tied) label = "models" if len(tied) > 1 else "model" return ( f"{names} " f"({label}; unsafe rejection {_format_metric(best_unsafe)}, " f"false route {_format_metric(best_false_route)})" ) def _false_route_summary(results: list[dict[str, Any]]) -> str: best = _best_lower(results, "false_route_rate") worst_rows = [ result for result in results if isinstance(result["metrics"].get("false_route_rate"), (int, float)) and result["metrics"]["false_route_rate"] > 0 ] if not worst_rows: return f"remained low across available results; best is {best}" worst = max(worst_rows, key=lambda result: result["metrics"]["false_route_rate"]) return ( f"best is {best}; highest observed is {worst['name']} " f"({_format_metric(worst['metrics']['false_route_rate'])})" ) def _next_improvement(results: list[dict[str, Any]]) -> str: scored = [ result for result in results if isinstance(result["metrics"].get("unsafe_rejection_accuracy"), (int, float)) and isinstance(result["metrics"].get("false_route_rate"), (int, float)) and isinstance(result["metrics"].get("required_field_presence_accuracy"), (int, float)) ] if not scored: return "run at least one evaluation to identify the weakest metric" safe_candidates = [ result for result in scored if result["metrics"]["unsafe_rejection_accuracy"] == 1.0 and result["metrics"]["false_route_rate"] == 0.0 ] candidates = safe_candidates or scored reference = max( candidates, key=lambda result: result["metrics"]["required_field_presence_accuracy"], ) weaknesses = { "workflow_accuracy": "workflow classification", "status_accuracy": "status classification", "required_field_presence_accuracy": "structured parameter extraction", "unsafe_rejection_accuracy": "unsafe request rejection", } lowest_metric = min( weaknesses, key=lambda metric: reference["metrics"].get(metric, 1.0), ) if reference["metrics"].get("false_route_rate", 0.0) > 0: return "reduce false routes before optimizing convenience metrics" return weaknesses[lowest_metric] def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Compare RouterCore evaluation result JSON files.") parser.add_argument("--results-dir", type=Path, default=DEFAULT_RESULTS_DIR) parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT_PATH) return parser.parse_args() def main() -> None: args = parse_args() output_path, results, report = write_report(args.results_dir, args.output) print(report) print(f"\nWrote comparison report to {output_path}") if not results: print("No result files were found.") if __name__ == "__main__": main()