| from __future__ import annotations |
|
|
| import json |
| from pathlib import Path |
| from typing import Any |
|
|
| from eval.metrics import compute_metrics |
| from routercore.policy import evaluate_policy |
| from routercore.router import FakeRouter |
| from routercore.validator import validate_route |
| from training.generate_dataset import generate_datasets |
|
|
|
|
| PROJECT_ROOT = Path(__file__).resolve().parents[1] |
| EVAL_PATH = PROJECT_ROOT / "data" / "eval.jsonl" |
| RESULTS_PATH = PROJECT_ROOT / "eval" / "results" / "fakerouter_eval.json" |
|
|
|
|
| def load_jsonl(path: Path) -> list[dict[str, Any]]: |
| if not path.exists(): |
| generate_datasets() |
| rows = [] |
| with path.open("r", encoding="utf-8") as handle: |
| for line in handle: |
| if line.strip(): |
| rows.append(json.loads(line)) |
| return rows |
|
|
|
|
| def _actual_from_flow(router_output: Any, validation_result: Any, policy_decision: Any) -> dict[str, Any]: |
| return { |
| "status": policy_decision.status, |
| "workflow": policy_decision.workflow, |
| "confidence": router_output.confidence, |
| "parameters": router_output.parameters, |
| "missing_fields": validation_result.missing_fields, |
| "candidate_workflows": [ |
| candidate.model_dump(mode="json") |
| for candidate in router_output.candidate_workflows |
| ], |
| "failure_reasons": policy_decision.reasons |
| or validation_result.failure_reasons |
| or router_output.failure_reasons, |
| "clarifying_question": policy_decision.clarifying_question, |
| } |
|
|
|
|
| def _pass_fail_notes(row: dict[str, Any], actual: dict[str, Any]) -> list[str]: |
| expected = row["expected"] |
| notes: list[str] = [] |
|
|
| if actual["status"] != expected["status"]: |
| notes.append(f"status mismatch: expected {expected['status']}, got {actual['status']}") |
|
|
| if expected["workflow"] is not None and actual["workflow"] != expected["workflow"]: |
| notes.append(f"workflow mismatch: expected {expected['workflow']}, got {actual['workflow']}") |
|
|
| if expected["status"] in {"routed", "requires_confirmation"}: |
| missing_keys = sorted(set(expected.get("parameters", {})) - set(actual.get("parameters", {}))) |
| if missing_keys: |
| notes.append(f"missing expected parameter keys: {', '.join(missing_keys)}") |
|
|
| if row["case_type"] == "risky_rejected" and actual["status"] != "rejected": |
| notes.append("unsafe request was not rejected") |
|
|
| if expected["status"] in {"needs_clarification", "rejected", "requires_confirmation"}: |
| if actual["status"] == "routed": |
| notes.append("false route: system routed a case that needed clarification, confirmation, or rejection") |
|
|
| return notes or ["pass"] |
|
|
|
|
| def run_eval() -> dict[str, Any]: |
| router = FakeRouter() |
| examples = load_jsonl(EVAL_PATH) |
| per_example_results: list[dict[str, Any]] = [] |
| metric_rows: list[dict[str, Any]] = [] |
|
|
| for item in examples: |
| router_output = router.route(item["input"]) |
| validation_result = validate_route(router_output) |
| policy_decision = evaluate_policy( |
| router_output, |
| validation_result, |
| original_request=item["input"], |
| ) |
| actual = _actual_from_flow(router_output, validation_result, policy_decision) |
| notes = _pass_fail_notes(item, actual) |
|
|
| metric_rows.append( |
| { |
| "id": item["id"], |
| "case_type": item["case_type"], |
| "expected": item["expected"], |
| "actual": actual, |
| } |
| ) |
| per_example_results.append( |
| { |
| "id": item["id"], |
| "case_type": item["case_type"], |
| "input": item["input"], |
| "expected": item["expected"], |
| "actual_router_output": router_output.model_dump(mode="json"), |
| "validation_result": validation_result.model_dump(mode="json"), |
| "policy_decision": policy_decision.model_dump(mode="json"), |
| "actual": actual, |
| "pass_fail_notes": notes, |
| } |
| ) |
|
|
| summary = compute_metrics(metric_rows) |
| return { |
| "summary_metrics": summary, |
| "per_example_results": per_example_results, |
| } |
|
|
|
|
| def _print_metrics_table(metrics: dict[str, float]) -> None: |
| print("FakeRouter Evaluation") |
| print("=====================") |
| for name, value in metrics.items(): |
| print(f"{name:40} {value:6.2%}") |
|
|
|
|
| def main() -> None: |
| output = run_eval() |
| RESULTS_PATH.parent.mkdir(parents=True, exist_ok=True) |
| RESULTS_PATH.write_text(json.dumps(output, indent=2), encoding="utf-8") |
| _print_metrics_table(output["summary_metrics"]) |
| print(f"\nWrote detailed results to {RESULTS_PATH}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|