routercore / eval /compare_results.py
Jayteare's picture
Deploy RouterCore Gradio demo
1137e50 verified
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
PROJECT_ROOT = Path(__file__).resolve().parents[1]
DEFAULT_RESULTS_DIR = PROJECT_ROOT / "eval" / "results"
DEFAULT_OUTPUT_PATH = PROJECT_ROOT / "docs" / "eval_comparison.md"
METRIC_NAMES = [
"json_validity_rate",
"workflow_accuracy",
"status_accuracy",
"required_field_presence_accuracy",
"unsafe_rejection_accuracy",
"false_route_rate",
]
def load_eval_results(results_dir: Path) -> list[dict[str, Any]]:
if not results_dir.exists():
return []
results: list[dict[str, Any]] = []
for path in sorted(results_dir.glob("*.json")):
with path.open("r", encoding="utf-8") as handle:
payload = json.load(handle)
metrics = payload.get("summary_metrics", {})
if not isinstance(metrics, dict):
continue
results.append(
{
"name": _display_name(path, payload),
"path": path,
"metrics": {metric: metrics.get(metric) for metric in METRIC_NAMES},
}
)
return results
def build_markdown_table(results: list[dict[str, Any]]) -> str:
header = "| Model | " + " | ".join(f"`{metric}`" for metric in METRIC_NAMES) + " |"
separator = "| --- | " + " | ".join("---:" for _ in METRIC_NAMES) + " |"
rows = [header, separator]
for result in results:
values = [_format_metric(result["metrics"].get(metric)) for metric in METRIC_NAMES]
rows.append(f"| {result['name']} | " + " | ".join(values) + " |")
return "\n".join(rows)
def build_interpretation(results: list[dict[str, Any]]) -> str:
if not results:
return (
"## Interpretation\n\n"
"No eval result JSON files were found. Run one of the evaluation commands first, "
"then regenerate this comparison report.\n"
)
best_extraction = _best_higher(results, "required_field_presence_accuracy")
safest = _best_safety(results)
false_route = _false_route_summary(results)
next_step = _next_improvement(results)
return (
"## Interpretation\n\n"
f"- Best structured extraction: {best_extraction}.\n"
f"- Safest model: {safest}.\n"
f"- False route rate: {false_route}.\n"
f"- Improve next: {next_step}.\n"
)
def build_report(results: list[dict[str, Any]]) -> str:
table = build_markdown_table(results) if results else "_No eval result files found._"
return (
"# Evaluation Comparison\n\n"
"This report compares RouterCore eval result artifacts from `eval/results/`.\n\n"
"## Metrics\n\n"
f"{table}\n\n"
f"{build_interpretation(results)}"
)
def write_report(
results_dir: Path = DEFAULT_RESULTS_DIR,
output_path: Path = DEFAULT_OUTPUT_PATH,
) -> tuple[Path, list[dict[str, Any]], str]:
results = load_eval_results(results_dir)
report = build_report(results)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(report, encoding="utf-8")
return output_path, results, report
def _display_name(path: Path, payload: dict[str, Any]) -> str:
if "model" in payload:
return str(payload["model"])
if "adapter" in payload:
return f"LoRA: {Path(str(payload['adapter'])).name}"
if path.stem == "fakerouter_eval":
return "FakeRouter"
return path.stem
def _format_metric(value: Any) -> str:
if isinstance(value, (int, float)):
return f"{value:.2%}"
return "n/a"
def _best_higher(results: list[dict[str, Any]], metric: str) -> str:
scored = [
result
for result in results
if isinstance(result["metrics"].get(metric), (int, float))
]
if not scored:
return "not available"
best = max(scored, key=lambda result: result["metrics"][metric])
return f"{best['name']} ({_format_metric(best['metrics'][metric])})"
def _best_lower(results: list[dict[str, Any]], metric: str) -> str:
scored = [
result
for result in results
if isinstance(result["metrics"].get(metric), (int, float))
]
if not scored:
return "not available"
best = min(scored, key=lambda result: result["metrics"][metric])
best_value = best["metrics"][metric]
tied = [result for result in scored if result["metrics"][metric] == best_value]
names = ", ".join(result["name"] for result in tied)
return f"{names} ({_format_metric(best_value)})"
def _best_safety(results: list[dict[str, Any]]) -> str:
scored = [
result
for result in results
if isinstance(result["metrics"].get("unsafe_rejection_accuracy"), (int, float))
and isinstance(result["metrics"].get("false_route_rate"), (int, float))
]
if not scored:
return "not available"
best = max(
scored,
key=lambda result: (
result["metrics"]["unsafe_rejection_accuracy"],
-result["metrics"]["false_route_rate"],
),
)
best_unsafe = best["metrics"]["unsafe_rejection_accuracy"]
best_false_route = best["metrics"]["false_route_rate"]
tied = [
result
for result in scored
if result["metrics"]["unsafe_rejection_accuracy"] == best_unsafe
and result["metrics"]["false_route_rate"] == best_false_route
]
names = ", ".join(result["name"] for result in tied)
label = "models" if len(tied) > 1 else "model"
return (
f"{names} "
f"({label}; unsafe rejection {_format_metric(best_unsafe)}, "
f"false route {_format_metric(best_false_route)})"
)
def _false_route_summary(results: list[dict[str, Any]]) -> str:
best = _best_lower(results, "false_route_rate")
worst_rows = [
result
for result in results
if isinstance(result["metrics"].get("false_route_rate"), (int, float))
and result["metrics"]["false_route_rate"] > 0
]
if not worst_rows:
return f"remained low across available results; best is {best}"
worst = max(worst_rows, key=lambda result: result["metrics"]["false_route_rate"])
return (
f"best is {best}; highest observed is {worst['name']} "
f"({_format_metric(worst['metrics']['false_route_rate'])})"
)
def _next_improvement(results: list[dict[str, Any]]) -> str:
scored = [
result
for result in results
if isinstance(result["metrics"].get("unsafe_rejection_accuracy"), (int, float))
and isinstance(result["metrics"].get("false_route_rate"), (int, float))
and isinstance(result["metrics"].get("required_field_presence_accuracy"), (int, float))
]
if not scored:
return "run at least one evaluation to identify the weakest metric"
safe_candidates = [
result
for result in scored
if result["metrics"]["unsafe_rejection_accuracy"] == 1.0
and result["metrics"]["false_route_rate"] == 0.0
]
candidates = safe_candidates or scored
reference = max(
candidates,
key=lambda result: result["metrics"]["required_field_presence_accuracy"],
)
weaknesses = {
"workflow_accuracy": "workflow classification",
"status_accuracy": "status classification",
"required_field_presence_accuracy": "structured parameter extraction",
"unsafe_rejection_accuracy": "unsafe request rejection",
}
lowest_metric = min(
weaknesses,
key=lambda metric: reference["metrics"].get(metric, 1.0),
)
if reference["metrics"].get("false_route_rate", 0.0) > 0:
return "reduce false routes before optimizing convenience metrics"
return weaknesses[lowest_metric]
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Compare RouterCore evaluation result JSON files.")
parser.add_argument("--results-dir", type=Path, default=DEFAULT_RESULTS_DIR)
parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT_PATH)
return parser.parse_args()
def main() -> None:
args = parse_args()
output_path, results, report = write_report(args.results_dir, args.output)
print(report)
print(f"\nWrote comparison report to {output_path}")
if not results:
print("No result files were found.")
if __name__ == "__main__":
main()